diff --git a/NATS.Client.sln b/NATS.Client.sln index d33a223cf..afd3e5adf 100644 --- a/NATS.Client.sln +++ b/NATS.Client.sln @@ -79,6 +79,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tes EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ObjectStore", "sandbox\Example.ObjectStore\Example.ObjectStore.csproj", "{51882883-A66E-4F95-A1AB-CFCBF71B4376}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nats.Client.Compat", "tests\Nats.Client.Compat\Nats.Client.Compat.csproj", "{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}" +EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services", "src\NATS.Client.Services\NATS.Client.Services.csproj", "{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services.Tests", "tests\NATS.Client.Services.Tests\NATS.Client.Services.Tests.csproj", "{749CAE39-4C1E-4627-9E31-A36B987BC453}" @@ -217,6 +219,10 @@ Global {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.Build.0 = Debug|Any CPU {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.ActiveCfg = Release|Any CPU {51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.Build.0 = Release|Any CPU + {35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Debug|Any CPU.Build.0 = Debug|Any CPU + {35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Release|Any CPU.ActiveCfg = Release|Any CPU + {35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Release|Any CPU.Build.0 = Release|Any CPU {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.Build.0 = Debug|Any CPU {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -284,6 +290,7 @@ Global {3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {51882883-A66E-4F95-A1AB-CFCBF71B4376} = {95A69671-16CA-4133-981C-CC381B7AAA30} + {35578296-FF6C-4BA4-BEE5-C1A6E7DB7024} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {050C63EE-8F1C-4535-9C6C-E12E62A1FF1D} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C} {749CAE39-4C1E-4627-9E31-A36B987BC453} = {C526E8AB-739A-48D7-8FC4-048978C9B650} {DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F} = {95A69671-16CA-4133-981C-CC381B7AAA30} diff --git a/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs b/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs index f81821370..5f96983c0 100644 --- a/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs +++ b/src/NATS.Client.ObjectStore/Internal/NatsObjJsonSerializer.cs @@ -11,6 +11,7 @@ internal static class NatsObjJsonSerializer [JsonSerializable(typeof(ObjectMetadata))] [JsonSerializable(typeof(MetaDataOptions))] +[JsonSerializable(typeof(NatsObjLink))] internal partial class NatsObjJsonSerializerContext : JsonSerializerContext { } diff --git a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs index 14b75e564..24fc6c82f 100644 --- a/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs +++ b/src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs @@ -10,6 +10,12 @@ public record ObjectMetadata [JsonPropertyName("name")] public string Name { get; set; } = default!; + /// + /// Object description + /// + [JsonPropertyName("description")] + public string Description { get; set; } = default!; + /// /// Bucket name /// @@ -52,9 +58,16 @@ public record ObjectMetadata /// /// Object metadata /// - [JsonPropertyName("meta")] + [JsonPropertyName("metadata")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public Dictionary Metadata { get; set; } = default!; + + /// + /// Object metadata + /// + [JsonPropertyName("headers")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - public Dictionary Meta { get; set; } = default!; + public Dictionary Headers { get; set; } = default!; /// /// Object deleted @@ -68,15 +81,39 @@ public record ObjectMetadata /// [JsonPropertyName("options")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - public MetaDataOptions Options { get; set; } = default!; + public MetaDataOptions? Options { get; set; } = default!; } public record MetaDataOptions { + /// + /// Link + /// + [JsonPropertyName("link")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public NatsObjLink? Link { get; set; } = default!; + /// /// Max chunk size /// [JsonPropertyName("max_chunk_size")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] - public int MaxChunkSize { get; set; } = default!; + public int? MaxChunkSize { get; set; } = default!; +} + +public record NatsObjLink +{ + /// + /// Link name + /// + [JsonPropertyName("name")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public string Name { get; set; } = default!; + + /// + /// Bucket name + /// + [JsonPropertyName("bucket")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] + public string Bucket { get; set; } = default!; } diff --git a/src/NATS.Client.ObjectStore/NatsObjConfig.cs b/src/NATS.Client.ObjectStore/NatsObjConfig.cs index 80a1f3029..30e3ae8d2 100644 --- a/src/NATS.Client.ObjectStore/NatsObjConfig.cs +++ b/src/NATS.Client.ObjectStore/NatsObjConfig.cs @@ -33,7 +33,7 @@ public record NatsObjConfig(string Bucket) /// /// Type of backing storage to use. /// - public NatsObjStorageType? Storage { get; init; } + public NatsObjStorageType Storage { get; init; } /// /// How many replicas to keep for each key. diff --git a/src/NATS.Client.ObjectStore/NatsObjContext.cs b/src/NATS.Client.ObjectStore/NatsObjContext.cs index a2e46a80a..f3c1087c7 100644 --- a/src/NATS.Client.ObjectStore/NatsObjContext.cs +++ b/src/NATS.Client.ObjectStore/NatsObjContext.cs @@ -58,10 +58,18 @@ public async ValueTask CreateObjectStore(NatsObjConfig config, Can AllowDirect = true, Metadata = config.Metadata!, Retention = StreamConfigurationRetention.limits, + Compression = StreamConfigurationCompression.none, }; var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken); - return new NatsObjStore(config, _context, stream); + return new NatsObjStore(config, this, _context, stream); + } + + public async ValueTask GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default) + { + ValidateBucketName(bucket); + var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken); + return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream); } /// diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 578a0c058..5808d7896 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -1,5 +1,7 @@ using System.Buffers; +using System.Runtime.CompilerServices; using System.Security.Cryptography; +using System.Text.Json; using System.Text.RegularExpressions; using NATS.Client.Core; using NATS.Client.Core.Internal; @@ -21,15 +23,18 @@ public class NatsObjStore private const string RollupSubject = "sub"; private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } }; + private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); private readonly string _bucket; + private readonly NatsObjContext _objContext; private readonly NatsJSContext _context; private readonly NatsJSStream _stream; - internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream) + internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, NatsJSStream stream) { _bucket = config.Bucket; + _objContext = objContext; _context = context; _stream = stream; } @@ -62,11 +67,20 @@ public async ValueTask GetAsync(string key, Stream stream, bool var info = await GetInfoAsync(key, cancellationToken: cancellationToken); + if (info.Options?.Link is { } link) + { + var store = await _objContext.GetObjectStoreAsync(link.Bucket, cancellationToken).ConfigureAwait(false); + return await store.GetAsync(link.Name, stream, leaveOpen, cancellationToken).ConfigureAwait(false); + } + await using var pushConsumer = new NatsJSOrderedPushConsumer>( _context, $"OBJ_{_bucket}", GetChunkSubject(info.Nuid), - new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all }, + new NatsJSOrderedPushConsumerOpts + { + DeliverPolicy = ConsumerConfigurationDeliverPolicy.all, + }, new NatsSubOpts(), cancellationToken); @@ -180,19 +194,19 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre meta.Nuid = nuid; meta.MTime = DateTimeOffset.UtcNow; - if (meta.Options == null!) + meta.Options ??= new MetaDataOptions { - meta.Options = new MetaDataOptions { MaxChunkSize = DefaultChunkSize }; - } + MaxChunkSize = DefaultChunkSize, + }; - if (meta.Options.MaxChunkSize == 0) + if (meta.Options.MaxChunkSize is null or <= 0) { meta.Options.MaxChunkSize = DefaultChunkSize; } var size = 0; var chunks = 0; - var chunkSize = meta.Options.MaxChunkSize; + var chunkSize = meta.Options.MaxChunkSize!.Value; string digest; using (var sha256 = SHA256.Create()) @@ -266,7 +280,10 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre { await _context.JSRequestResponseAsync( subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}", - request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) }, + request: new StreamPurgeRequest + { + Filter = GetChunkSubject(info.Nuid), + }, cancellationToken); } catch (NatsJSApiException e) @@ -279,6 +296,82 @@ await _context.JSRequestResponseAsync( return meta; } + public async ValueTask UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default) + { + ValidateObjectName(meta.Name); + + var info = await GetInfoAsync(key, cancellationToken: cancellationToken).ConfigureAwait(false); + + if (key != meta.Name) + { + // Make sure the new name is available + try + { + await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false); + throw new NatsObjException($"Object already exists: {meta.Name}"); + } + catch (NatsObjNotFoundException) + { + } + } + + info.Name = meta.Name; + info.Description = meta.Description; + info.Metadata = meta.Metadata; + info.Headers = meta.Headers; + + await PublishMeta(info, cancellationToken); + + return info; + } + + public async ValueTask AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default) + { + ValidateObjectName(link); + ValidateObjectName(target.Name); + + if (target.Deleted) + { + throw new NatsObjException("Can't link to a deleted object"); + } + + if (target.Options?.Link is not null) + { + throw new NatsObjException("Can't link to a linked object"); + } + + try + { + var checkLink = await GetInfoAsync(link, showDeleted: true, cancellationToken: cancellationToken).ConfigureAwait(false); + if (checkLink.Options?.Link is null) + { + throw new NatsObjException("Object already exists"); + } + } + catch (NatsObjNotFoundException) + { + } + + var info = new ObjectMetadata + { + Name = link, + Bucket = _bucket, + Nuid = NewNuid(), + Options = new MetaDataOptions + { + Link = new NatsObjLink + { + Name = target.Name, + Bucket = target.Bucket, + }, + }, + }; + + await PublishMeta(info, cancellationToken); + + return info; + } + /// /// Get object metadata by key. /// @@ -291,12 +384,16 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted { ValidateObjectName(key); - var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) }; + var request = new StreamMsgGetRequest + { + LastBySubj = GetMetaSubject(key), + }; try { var response = await _stream.GetAsync(request, cancellationToken); - var data = NatsObjJsonSerializer.Default.Deserialize(new ReadOnlySequence(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata"); + var base64String = Convert.FromBase64String(response.Message.Data); + var data = NatsObjJsonSerializer.Default.Deserialize(new ReadOnlySequence(base64String)) ?? throw new NatsObjException("Can't deserialize object metadata"); if (!showDeleted && data.Deleted) { @@ -316,6 +413,44 @@ public async ValueTask GetInfoAsync(string key, bool showDeleted } } + public async IAsyncEnumerable WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + opts ??= new NatsObjWatchOpts(); + + var deliverPolicy = ConsumerConfigurationDeliverPolicy.all; + if (opts.UpdatesOnly) + { + deliverPolicy = ConsumerConfigurationDeliverPolicy.@new; + } + + await using var pushConsumer = new NatsJSOrderedPushConsumer>( + _context, + $"OBJ_{_bucket}", + $"$O.{_bucket}.M.>", + new NatsJSOrderedPushConsumerOpts + { + DeliverPolicy = deliverPolicy, + }, + new NatsSubOpts(), + cancellationToken); + + pushConsumer.Init(); + + await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) + { + if (pushConsumer.IsDone) + continue; + using (msg.Data) + { + var info = JsonSerializer.Deserialize(msg.Data.Memory.Span, NatsObjJsonSerializerContext.Default.ObjectMetadata); + if (info != null) + { + yield return info; + } + } + } + } + /// /// Delete an object by key. /// @@ -382,3 +517,8 @@ private void ValidateObjectName(string name) } } } + +public record NatsObjWatchOpts +{ + public bool UpdatesOnly { get; init; } +} diff --git a/src/NATS.Client.Services/NatsSvcConfig.cs b/src/NATS.Client.Services/NatsSvcConfig.cs index fc357cf30..20cc88e2a 100644 --- a/src/NATS.Client.Services/NatsSvcConfig.cs +++ b/src/NATS.Client.Services/NatsSvcConfig.cs @@ -62,5 +62,5 @@ public NatsSvcConfig(string name, string version) /// Stats handler. JSON object returned by this handler will be included in /// the service stats data property. /// - public Func? StatsHandler { get; init; } + public Func? StatsHandler { get; init; } } diff --git a/src/NATS.Client.Services/NatsSvcEndPoint.cs b/src/NATS.Client.Services/NatsSvcEndPoint.cs index cf126a877..de3aedf5d 100644 --- a/src/NATS.Client.Services/NatsSvcEndPoint.cs +++ b/src/NATS.Client.Services/NatsSvcEndPoint.cs @@ -43,6 +43,11 @@ public interface INatsSvcEndpoint : IAsyncDisposable /// IDictionary? Metadata { get; } + /// + /// The name of the endpoint. + /// + string Name { get; } + /// /// The subject name to subscribe to. /// @@ -87,6 +92,9 @@ protected NatsSvcEndpointBase(NatsConnection connection, ISubscriptionManager ma /// public abstract IDictionary? Metadata { get; } + /// + public abstract string Name { get; } + internal abstract void IncrementErrors(); internal abstract void SetLastError(string error); @@ -101,7 +109,6 @@ public class NatsSvcEndpoint : NatsSvcEndpointBase private readonly ILogger _logger; private readonly Func, ValueTask> _handler; private readonly NatsConnection _nats; - private readonly string _name; private readonly CancellationToken _cancellationToken; private readonly Channel> _channel; private readonly INatsSerializer _serializer; @@ -129,7 +136,7 @@ public NatsSvcEndpoint(NatsConnection nats, string? queueGroup, string name, Fun _logger = nats.Opts.LoggerFactory.CreateLogger>(); _handler = handler; _nats = nats; - _name = name; + Name = name; Metadata = metadata; _cancellationToken = cancellationToken; _serializer = opts?.Serializer ?? _nats.Opts.Serializer; @@ -137,6 +144,9 @@ public NatsSvcEndpoint(NatsConnection nats, string? queueGroup, string name, Fun _handlerTask = Task.Run(HandlerLoop); } + /// + public override string Name { get; } + /// public override long Requests => Volatile.Read(ref _requests); @@ -184,7 +194,7 @@ protected override ValueTask ReceiveInternalAsync( } catch (Exception e) { - _logger.LogError(e, "Endpoint {Name} error building message", _name); + _logger.LogError(e, "Endpoint {Name} error building message", Name); exception = e; // Most likely a serialization error. @@ -229,7 +239,7 @@ private async Task HandlerLoop() body = string.Empty; // Only log unknown exceptions - _logger.LogError(e, "Endpoint {Name} error processing message", _name); + _logger.LogError(e, "Endpoint {Name} error processing message", Name); } try @@ -245,7 +255,7 @@ private async Task HandlerLoop() } catch (Exception e1) { - _logger.LogError(e1, "Endpoint {Name} error responding", _name); + _logger.LogError(e1, "Endpoint {Name} error responding", Name); } } finally diff --git a/src/NATS.Client.Services/NatsSvcMsg.cs b/src/NATS.Client.Services/NatsSvcMsg.cs index d4aa8ac5b..61bbe7832 100644 --- a/src/NATS.Client.Services/NatsSvcMsg.cs +++ b/src/NATS.Client.Services/NatsSvcMsg.cs @@ -108,11 +108,11 @@ public ValueTask ReplyErrorAsync(int code, string message, TReply data, public ValueTask ReplyErrorAsync(int code, string message, NatsHeaders? headers = default, string? replyTo = default, NatsPubOpts? opts = default, CancellationToken cancellationToken = default) { headers ??= new NatsHeaders(); - headers.Add("Nats-Service-Error-Code", $"{code}"); headers.Add("Nats-Service-Error", $"{message}"); + headers.Add("Nats-Service-Error-Code", $"{code}"); _endPoint?.IncrementErrors(); - _endPoint?.SetLastError($"{message} ({code})"); + _endPoint?.SetLastError($"{code}:{message}"); return ReplyAsync(headers: headers, replyTo: replyTo, opts: opts, cancellationToken: cancellationToken); } diff --git a/src/NATS.Client.Services/NatsSvcServer.cs b/src/NATS.Client.Services/NatsSvcServer.cs index e4eaa324f..2e4a73294 100644 --- a/src/NATS.Client.Services/NatsSvcServer.cs +++ b/src/NATS.Client.Services/NatsSvcServer.cs @@ -52,6 +52,10 @@ public NatsSvcServer(NatsConnection nats, NatsSvcConfig config, CancellationToke /// A representing the asynchronous operation. public async ValueTask StopAsync(CancellationToken cancellationToken = default) { + // Return here when disposed if we already stopped + if (_cts.Token.IsCancellationRequested) + return; + foreach (var listener in _svcListeners) { await listener.DisposeAsync(); @@ -78,15 +82,21 @@ public async ValueTask StopAsync(CancellationToken cancellationToken = default) /// Callback for handling incoming messages. /// Optional endpoint name. /// Optional endpoint subject. + /// Queue group name (defaults to service group's). /// Optional endpoint metadata. + /// Serializer to use when deserializing incoming messages (defaults to connection's serializer). /// A used to stop the endpoint. /// Serialization type for messages received. /// A representing the asynchronous operation. /// /// One of name or subject must be specified. /// - public ValueTask AddEndpointAsync(Func, ValueTask> handler, string? name = default, string? subject = default, IDictionary? metadata = default, CancellationToken cancellationToken = default) => - AddEndpointInternalAsync(handler, name, subject, _config.QueueGroup, metadata, cancellationToken); + public ValueTask AddEndpointAsync(Func, ValueTask> handler, string? name = default, string? subject = default, string? queueGroup = default, IDictionary? metadata = default, INatsSerializer? serializer = default, CancellationToken cancellationToken = default) + { + serializer ??= _nats.Opts.Serializer; + queueGroup ??= _config.QueueGroup; + return AddEndpointInternalAsync(handler, name, subject, queueGroup, metadata, serializer, cancellationToken); + } /// /// Adds a new service group with optional queue group. @@ -126,12 +136,12 @@ internal async ValueTask StartAsync() } } - private async ValueTask AddEndpointInternalAsync(Func, ValueTask> handler, string? name, string? subject, string? queueGroup, IDictionary? metadata, CancellationToken cancellationToken) + private async ValueTask AddEndpointInternalAsync(Func, ValueTask> handler, string? name, string? subject, string? queueGroup, IDictionary? metadata, INatsSerializer serializer, CancellationToken cancellationToken) { var epSubject = subject ?? name ?? throw new NatsSvcException("Either name or subject must be specified"); - var epName = name ?? epSubject; + var epName = name ?? epSubject.Replace(".", "-"); - var ep = new NatsSvcEndpoint(_nats, queueGroup, epName, handler, epSubject, metadata, opts: default, cancellationToken); + var ep = new NatsSvcEndpoint(_nats, queueGroup, epName, handler, epSubject, metadata, opts: new NatsSubOpts { Serializer = serializer }, cancellationToken); if (!_endPoints.TryAdd(epName, ep)) { @@ -161,7 +171,13 @@ private async Task MsgLoop() } await svcMsg.Msg.ReplyAsync( - new PingResponse { Name = _config.Name, Id = _id, Version = _config.Version, }, + new PingResponse + { + Name = _config.Name, + Id = _id, + Version = _config.Version, + Metadata = _config.Metadata!, + }, opts: new NatsPubOpts { Serializer = NatsSrvJsonSerializer.Default }, cancellationToken: _cancellationToken); } @@ -205,7 +221,7 @@ await svcMsg.Msg.ReplyAsync( JsonNode? statsData; try { - statsData = _config.StatsHandler?.Invoke(); + statsData = _config.StatsHandler?.Invoke(ep.Value); } catch (Exception ex) { @@ -222,7 +238,7 @@ await svcMsg.Msg.ReplyAsync( ProcessingTime = ep.Value.ProcessingTime, NumRequests = ep.Value.Requests, NumErrors = ep.Value.Errors, - LastError = ep.Value.LastError!, + LastError = ep.Value.LastError ?? string.Empty, AverageProcessingTime = ep.Value.AverageProcessingTime, }; }).ToList(); @@ -286,19 +302,22 @@ public Group(NatsSvcServer server, string groupName, string? queueGroup = defaul /// Callback for handling incoming messages. /// Optional endpoint name. /// Optional endpoint subject. + /// Queue group name (defaults to service group's). /// Optional endpoint metadata. + /// Serializer to use when deserializing incoming messages (defaults to connection's serializer). /// A used to stop the endpoint. /// Serialization type for messages received. /// A representing the asynchronous operation. /// /// One of name or subject must be specified. /// - public ValueTask AddEndpointAsync(Func, ValueTask> handler, string? name = default, string? subject = default, IDictionary? metadata = default, CancellationToken cancellationToken = default) + public ValueTask AddEndpointAsync(Func, ValueTask> handler, string? name = default, string? subject = default, string? queueGroup = default, IDictionary? metadata = default, INatsSerializer? serializer = default, CancellationToken cancellationToken = default) { - var epName = name != null ? $"{GroupName}{_dot}{name}" : null; + subject ??= name; var epSubject = subject != null ? $"{GroupName}{_dot}{subject}" : null; - var queueGroup = QueueGroup ?? _server._config.QueueGroup; - return _server.AddEndpointInternalAsync(handler, epName, epSubject, queueGroup, metadata, cancellationToken); + queueGroup ??= QueueGroup ?? _server._config.QueueGroup; + serializer ??= _server._nats.Opts.Serializer; + return _server.AddEndpointInternalAsync(handler, name, epSubject, queueGroup, metadata, serializer, cancellationToken); } /// diff --git a/tests/NATS.Client.CheckNativeAot/Program.cs b/tests/NATS.Client.CheckNativeAot/Program.cs index 239beb9a9..5143d9bfa 100644 --- a/tests/NATS.Client.CheckNativeAot/Program.cs +++ b/tests/NATS.Client.CheckNativeAot/Program.cs @@ -327,14 +327,14 @@ await grp2.AddEndpointAsync( Assert.Equal("foo.baz", info.Endpoints.First(e => e.Name == "baz").Subject); Assert.Equal("q", info.Endpoints.First(e => e.Name == "baz").QueueGroup); - Assert.Equal("foo.bar1", info.Endpoints.First(e => e.Name == "foo.bar1").Subject); - Assert.Equal("q", info.Endpoints.First(e => e.Name == "foo.bar1").QueueGroup); + Assert.Equal("foo.bar1", info.Endpoints.First(e => e.Name == "foo-bar1").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "foo-bar1").QueueGroup); - Assert.Equal("grp1.e1", info.Endpoints.First(e => e.Name == "grp1.e1").Subject); - Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e1").QueueGroup); + Assert.Equal("grp1.e1", info.Endpoints.First(e => e.Name == "e1").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "e1").QueueGroup); - Assert.Equal("grp1.foo.bar2", info.Endpoints.First(e => e.Name == "grp1.e2").Subject); - Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e2").QueueGroup); + Assert.Equal("grp1.foo.bar2", info.Endpoints.First(e => e.Name == "e2").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "e2").QueueGroup); Assert.Equal("foo.empty1", info.Endpoints.First(e => e.Name == "empty1").Subject); Assert.Equal("q_empty", info.Endpoints.First(e => e.Name == "empty1").QueueGroup); @@ -346,7 +346,7 @@ await grp2.AddEndpointAsync( Description = "es-two", QueueGroup = "q2", Metadata = new Dictionary { { "k1", "v1" }, { "k2", "v2" }, }, - StatsHandler = () => JsonNode.Parse("{\"stat-k1\":\"stat-v1\",\"stat-k2\":\"stat-v2\"}")!, + StatsHandler = ep => JsonNode.Parse($"{{\"stat-k1\":\"stat-v1\",\"stat-k2\":\"stat-v2\",\"ep_name\": \"{ep.Name}\"}}")!, }, cancellationToken: cancellationToken); @@ -375,6 +375,7 @@ await s2.AddEndpointAsync( var eps = stat.Endpoints.First(); Assert.Equal("stat-v1", eps.Data["stat-k1"]?.GetValue()); Assert.Equal("stat-v2", eps.Data["stat-k2"]?.GetValue()); + Assert.Equal("s2baz", eps.Data["ep_name"]?.GetValue()); } Log("OK"); @@ -448,7 +449,7 @@ await s1.AddEndpointAsync( Assert.Equal("e1", endpointStats.Name); Assert.Equal(10, endpointStats.NumRequests); Assert.Equal(3, endpointStats.NumErrors); - Assert.Equal("Handler error (999)", endpointStats.LastError); + Assert.Equal("999:Handler error", endpointStats.LastError); Assert.True(endpointStats.ProcessingTime > 0); Assert.True(endpointStats.AverageProcessingTime > 0); diff --git a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs index 4fbee3a18..3d033012a 100644 --- a/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs @@ -118,13 +118,13 @@ public async Task Delete_one_msg() stream = await js.GetStreamAsync("s1", new StreamInfoRequest() { SubjectsFilter = "s1.*" }, cts.Token); - Assert.Equal(3, stream.Info.State.Subjects.Count); + Assert.Equal(3, stream.Info.State.Subjects?.Count); var deleteResponse = await js.DeleteMessageAsync("s1", new StreamMsgDeleteRequest { Seq = 1 }, cts.Token); Assert.True(deleteResponse.Success); stream = await js.GetStreamAsync("s1", new StreamInfoRequest() { SubjectsFilter = "s1.*" }, cts.Token); - Assert.Equal(2, stream.Info.State.Subjects.Count); + Assert.Equal(2, stream.Info.State.Subjects?.Count); } } diff --git a/tests/NATS.Client.Services.Tests/ServicesTests.cs b/tests/NATS.Client.Services.Tests/ServicesTests.cs index 9b6d6acad..e8812d0eb 100644 --- a/tests/NATS.Client.Services.Tests/ServicesTests.cs +++ b/tests/NATS.Client.Services.Tests/ServicesTests.cs @@ -113,7 +113,7 @@ await s1.AddEndpointAsync( Assert.Equal("e1", endpointStats.Name); Assert.Equal(10, endpointStats.NumRequests); Assert.Equal(3, endpointStats.NumErrors); - Assert.Equal("Handler error (999)", endpointStats.LastError); + Assert.Equal("999:Handler error", endpointStats.LastError); Assert.True(endpointStats.ProcessingTime > 0); Assert.True(endpointStats.AverageProcessingTime > 0); } @@ -171,14 +171,14 @@ await grp2.AddEndpointAsync( Assert.Equal("foo.baz", info.Endpoints.First(e => e.Name == "baz").Subject); Assert.Equal("q", info.Endpoints.First(e => e.Name == "baz").QueueGroup); - Assert.Equal("foo.bar1", info.Endpoints.First(e => e.Name == "foo.bar1").Subject); - Assert.Equal("q", info.Endpoints.First(e => e.Name == "foo.bar1").QueueGroup); + Assert.Equal("foo.bar1", info.Endpoints.First(e => e.Name == "foo-bar1").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "foo-bar1").QueueGroup); - Assert.Equal("grp1.e1", info.Endpoints.First(e => e.Name == "grp1.e1").Subject); - Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e1").QueueGroup); + Assert.Equal("grp1.e1", info.Endpoints.First(e => e.Name == "e1").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "e1").QueueGroup); - Assert.Equal("grp1.foo.bar2", info.Endpoints.First(e => e.Name == "grp1.e2").Subject); - Assert.Equal("q", info.Endpoints.First(e => e.Name == "grp1.e2").QueueGroup); + Assert.Equal("grp1.foo.bar2", info.Endpoints.First(e => e.Name == "e2").Subject); + Assert.Equal("q", info.Endpoints.First(e => e.Name == "e2").QueueGroup); Assert.Equal("foo.empty1", info.Endpoints.First(e => e.Name == "empty1").Subject); Assert.Equal("q_empty", info.Endpoints.First(e => e.Name == "empty1").QueueGroup); @@ -190,7 +190,7 @@ await grp2.AddEndpointAsync( Description = "es-two", QueueGroup = "q2", Metadata = new Dictionary { { "k1", "v1" }, { "k2", "v2" }, }, - StatsHandler = () => JsonNode.Parse("{\"stat-k1\":\"stat-v1\",\"stat-k2\":\"stat-v2\"}")!, + StatsHandler = ep => JsonNode.Parse($"{{\"stat-k1\":\"stat-v1\",\"stat-k2\":\"stat-v2\",\"ep_name\": \"{ep.Name}\"}}")!, }, cancellationToken: cancellationToken); @@ -219,6 +219,7 @@ await s2.AddEndpointAsync( var eps = stat.Endpoints.First(); Assert.Equal("stat-v1", eps.Data["stat-k1"]?.GetValue()); Assert.Equal("stat-v2", eps.Data["stat-k2"]?.GetValue()); + Assert.Equal("s2baz", eps.Data["ep_name"]?.GetValue()); } } diff --git a/tests/Nats.Client.Compat/Nats.Client.Compat.csproj b/tests/Nats.Client.Compat/Nats.Client.Compat.csproj new file mode 100644 index 000000000..20d8005a9 --- /dev/null +++ b/tests/Nats.Client.Compat/Nats.Client.Compat.csproj @@ -0,0 +1,16 @@ + + + + Exe + net6.0 + enable + enable + false + + + + + + + + diff --git a/tests/Nats.Client.Compat/ObjectStoreCompat.cs b/tests/Nats.Client.Compat/ObjectStoreCompat.cs new file mode 100644 index 000000000..16abd2a01 --- /dev/null +++ b/tests/Nats.Client.Compat/ObjectStoreCompat.cs @@ -0,0 +1,192 @@ +using System.Security.Cryptography; +using System.Text.Json.Nodes; +using System.Threading.Channels; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.ObjectStore; +using NATS.Client.ObjectStore.Models; + +// ReSharper disable UnusedMember.Global +// ReSharper disable UnusedType.Global +#pragma warning disable CS8602 // Dereference of a possibly null reference. + +namespace Nats.Client.Compat; + +public class ObjectStoreCompat +{ + public async Task TestDefaultBucket(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var config = json["config"]; + + await ob.CreateObjectStore(new NatsObjConfig(config["bucket"].GetValue())); + await msg.ReplyAsync(); + } + + public async Task TestCustomBucket(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var config = json["config"]; + + Enum.TryParse(config["storage"].GetValue(), true, out NatsObjStorageType storage); + var description = config["description"].GetValue(); + var fromSeconds = TimeSpan.FromSeconds(config["max_age"].GetValue() / 1_000_000_000.0); + var bucket = config["bucket"].GetValue(); + var numberOfReplicas = config["num_replicas"].GetValue(); + + await ob.CreateObjectStore(new NatsObjConfig(bucket) + { + Description = description, + MaxAge = fromSeconds, + Storage = storage, + NumberOfReplicas = numberOfReplicas, + }); + + await msg.ReplyAsync(); + } + + public async Task TestPutObject(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var url = json["url"].GetValue(); + var config = json["config"]; + + var store = await ob.GetObjectStoreAsync("test"); + + await store.PutAsync( + new ObjectMetadata + { + Name = config["name"].GetValue(), + Description = config["description"].GetValue(), + }, + await new HttpClient().GetStreamAsync(url)); + + await msg.ReplyAsync(); + } + + public async Task TestGetObject(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var bucket = json["bucket"].GetValue(); + var objectName = json["object"].GetValue(); + + var store = await ob.GetObjectStoreAsync(bucket); + + var bytes = await store.GetBytesAsync(objectName); + + var sha256 = SHA256.HashData(bytes); + + await msg.ReplyAsync(sha256); + } + + public async Task TestUpdateMetadata(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var bucket = json["bucket"].GetValue(); + var objectName = json["object"].GetValue(); + var name = json["config"]["name"].GetValue(); + var description = json["config"]["description"].GetValue(); + + var store = await ob.GetObjectStoreAsync(bucket); + + await store.UpdateMetaAsync(objectName, new ObjectMetadata + { + Name = name, + Description = description, + }); + + await msg.ReplyAsync(); + } + + public async Task TestWatchUpdates(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var bucket = json["bucket"].GetValue(); + + var store = await ob.GetObjectStoreAsync(bucket); + + await foreach (var info in store.WatchAsync(new NatsObjWatchOpts { UpdatesOnly = true })) + { + await msg.ReplyAsync(info.Digest); + break; + } + } + + public async Task TestWatch(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var bucket = json["bucket"].GetValue(); + + var store = await ob.GetObjectStoreAsync(bucket); + + var list = new List(); + await foreach (var info in store.WatchAsync()) + { + list.Add(info.Digest); + if (list.Count == 2) + break; + } + + await msg.ReplyAsync($"{list[0]},{list[1]}"); + } + + public async Task TestGetLink(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var bucket = json["bucket"].GetValue(); + var objectName = json["object"].GetValue(); + + var store = await ob.GetObjectStoreAsync(bucket); + + var bytes = await store.GetBytesAsync(objectName); + + var sha256 = SHA256.HashData(bytes); + + await msg.ReplyAsync(sha256); + } + + public async Task TestPutLink(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var ob = new NatsObjContext(new NatsJSContext(nats)); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var bucket = json["bucket"].GetValue(); + var objectName = json["object"].GetValue(); + var linkName = json["link_name"].GetValue(); + + var store = await ob.GetObjectStoreAsync(bucket); + + var target = await store.GetInfoAsync(objectName); + + await store.AddLinkAsync(linkName, target); + + await msg.ReplyAsync(); + } +} diff --git a/tests/Nats.Client.Compat/Program.cs b/tests/Nats.Client.Compat/Program.cs new file mode 100644 index 000000000..334859646 --- /dev/null +++ b/tests/Nats.Client.Compat/Program.cs @@ -0,0 +1,3 @@ +using Nats.Client.Compat; + +await Test.RunAsync(); diff --git a/tests/Nats.Client.Compat/ServiceCompat.cs b/tests/Nats.Client.Compat/ServiceCompat.cs new file mode 100644 index 000000000..3bb672fa6 --- /dev/null +++ b/tests/Nats.Client.Compat/ServiceCompat.cs @@ -0,0 +1,107 @@ +using System.Text.Json.Nodes; +using System.Threading.Channels; +using NATS.Client.Core; +using NATS.Client.Services; + +// ReSharper disable UnusedMember.Global +// ReSharper disable UnusedType.Global +#pragma warning disable CS8602 // Dereference of a possibly null reference. + +namespace Nats.Client.Compat; + +public class ServiceCompat +{ + public async Task TestCore(NatsConnection nats, NatsMsg> msg, ChannelReader>> reader) + { + var svc = new NatsSvcContext(nats); + var json = JsonNode.Parse(msg.Data.Span); + + // Test.Log($"JSON: {json}"); + var config = json["config"]; + var svcName = config["name"].GetValue(); + var version = config["version"].GetValue(); + var svcConfig = new NatsSvcConfig(svcName, version) + { + Description = config["description"].GetValue(), + Metadata = config["metadata"]?.AsObject().ToDictionary(kv => kv.Key, kv => kv.Value.GetValue()), + QueueGroup = config["queue_group"]?.GetValue() ?? "q", + StatsHandler = ep => JsonNode.Parse($"{{\"endpoint\":\"{ep.Name}\"}}")!, + }; + + await using var server = await svc.AddServiceAsync(svcConfig); + + var groupNames = config["groups"] + .AsArray() + .Select(node => ( + Name: node["name"].GetValue(), + QueueGroup: node["queue_group"]?.GetValue())) + .ToDictionary(kv => kv.Name, kv => kv.QueueGroup); + + Dictionary groups = new(); + foreach (var (key, value) in groupNames) + { + var group = await server.AddGroupAsync(key, value); + groups.Add(key, group); + } + + Func>, ValueTask> echoHandler = async m => + { + var memory = new Memory(new byte[m.Data.Memory.Length]); + using (m.Data) + m.Data.Memory.CopyTo(memory); + await m.ReplyAsync(memory); + }; + + Func>, ValueTask> faultHandler = async m => + { + await m.ReplyErrorAsync(500, "handler error"); + }; + + foreach (var node in config["endpoints"].AsArray()) + { + var name = node["name"]?.GetValue(); + var subject = node["subject"]?.GetValue(); + var metadata = node["metadata"]?.AsObject().ToDictionary(kv => kv.Key, kv => kv.Value.GetValue()); + var queueGroup = node["queue_group"]?.GetValue(); + var group = node["group"]?.GetValue(); + + var handler = name == "faulty" ? faultHandler : echoHandler; + + if (group is not null) + { + await groups[group].AddEndpointAsync( + handler: handler, + name: name, + subject: subject, + queueGroup: queueGroup, + metadata: metadata, + serializer: default); + } + else + { + await server.AddEndpointAsync( + handler: handler, + name: name, + subject: subject, + queueGroup: queueGroup, + metadata: metadata, + serializer: default); + } + } + + await msg.ReplyAsync(); + + msg = await reader.ReadAsync(); + + await server.StopAsync(); + + if (msg.Subject == "tests.service.core.destroy.command") + { + await msg.ReplyAsync(); + } + else + { + Test.Log($"{msg.Subject}"); + } + } +} diff --git a/tests/Nats.Client.Compat/Test.cs b/tests/Nats.Client.Compat/Test.cs new file mode 100644 index 000000000..dd0b4111a --- /dev/null +++ b/tests/Nats.Client.Compat/Test.cs @@ -0,0 +1,73 @@ +using System.Reflection; +using System.Threading.Channels; +using NATS.Client.Core; + +namespace Nats.Client.Compat; + +public class Test +{ + public static async Task RunAsync() + { + var url = Environment.GetEnvironmentVariable("NATS_URL") ?? NatsOpts.Default.Url; + var opts = NatsOpts.Default with { Url = url }; + await using var nats = new NatsConnection(opts); + + Log($"Connected to NATS server {url}"); + + await using var sub = await nats.SubscribeAsync>("tests.>"); + + Log($"Subscribed to {sub.Subject}"); + Log($"Ready to receive test messages..."); + + while (true) + { + var reader = sub.Msgs; + var msg = await reader.ReadAsync(); + + if (msg.Subject == "tests.done") + { + Log("Bye"); + break; + } + + var tokens = msg.Subject.Split('.'); + var suite = tokens[1]; + var test = tokens[2]; + var command = tokens[3]; + var action = tokens[4]; + + if (action == "result") + { + Log($"{command.ToUpper()}"); + } + else if (action == "command") + { + // We turn the suite name to a class name suffixed by 'Compat' and locate that class. + // For example suite 'object-store' becomes class name 'ObjectStoreCompat' + // which is our class containing those tests. + var typeName = typeof(Test).Namespace + "." + suite.Replace("-", string.Empty) + "compat"; + var type = typeof(Test).Assembly.GetType(typeName, true, true); + var instance = Activator.CreateInstance(type!); + + // Transform the test name to a method name prefixed by 'Test' + // so the test 'default-bucket' matches the method 'TestDefaultBucket' + var methodName = "test" + test.Replace("-", string.Empty); + var method = type!.GetMethod(methodName, BindingFlags.Public | BindingFlags.Instance | BindingFlags.IgnoreCase); + + Log($"Testing {suite} {test}..."); + + if (method != null) + { + await (Task)method.Invoke(instance, new object[] { nats, msg, reader })!; + } + else + { + Log($"Not implemented: {test}"); + await msg.ReplyAsync(); + } + } + } + } + + public static void Log(string message) => Console.WriteLine($"{DateTime.Now:hh:mm:ss} {message}"); +}