From 8cdfbbae1e7bf947cea30095ef96e8b57a92fa69 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 4 Dec 2024 11:49:18 +0000 Subject: [PATCH 1/9] Add KV try get --- sandbox/MicroBenchmark/KVBench.cs | 73 +++++++++++++++++++ sandbox/MicroBenchmark/MicroBenchmark.csproj | 1 + src/NATS.Client.KeyValueStore/INatsKVStore.cs | 2 + 3 files changed, 76 insertions(+) create mode 100644 sandbox/MicroBenchmark/KVBench.cs diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs new file mode 100644 index 000000000..cb3c6cd00 --- /dev/null +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -0,0 +1,73 @@ +using BenchmarkDotNet.Attributes; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; + +namespace MicroBenchmark; + +[MemoryDiagnoser] +[ShortRunJob] +[PlainExporter] +public class KVBench +{ + private NatsConnection _nats; + private NatsJSContext _js; + private NatsKVContext _kv; + private INatsKVStore _store; + + [Params(64, 512, 1024)] + public int Iter { get; set; } + + [GlobalSetup] + public async Task SetupAsync() + { + _nats = new NatsConnection(); + _js = new NatsJSContext(_nats); + _kv = new NatsKVContext(_js); + _store = await _kv.CreateStoreAsync("benchmark"); + } + + [Benchmark] + public async ValueTask TryGetAsync() + { + var total = 0; + for (var i = 0; i < Iter; i++) + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + total++; + } + } + + if (total != Iter) + throw new Exception(); + + return total; + } + + [Benchmark] + public async ValueTask GetAsync() + { + var total = 0; + for (var i = 0; i < Iter; i++) + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + total++; + } + } + + if (total != Iter) + throw new Exception(); + + return total; + } +} diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 96d4079ab..6e3ae052e 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -20,6 +20,7 @@ + diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index 1c99a0a3c..49b6fa40c 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -77,6 +77,8 @@ public interface INatsKVStore /// There was an error with metadata ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// /// Start a watcher for specific keys /// From 9b4d49bfa4a51f90bf9c85358a295761f0542785 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 4 Dec 2024 17:21:12 +0000 Subject: [PATCH 2/9] Try-get implementation --- sandbox/MicroBenchmark/KVBench.cs | 27 +++- sandbox/MicroBenchmark/MicroBenchmark.csproj | 2 +- src/NATS.Client.Core/NatsResult.cs | 7 + .../NatsKVException.cs | 2 + src/NATS.Client.KeyValueStore/NatsKVStore.cs | 135 ++++++++++++++++++ 5 files changed, 168 insertions(+), 5 deletions(-) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index cb3c6cd00..7e773d414 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -1,8 +1,10 @@ -using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Attributes; using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.KeyValueStore; +#pragma warning disable CS8618 + namespace MicroBenchmark; [MemoryDiagnoser] @@ -13,7 +15,7 @@ public class KVBench private NatsConnection _nats; private NatsJSContext _js; private NatsKVContext _kv; - private INatsKVStore _store; + private NatsKVStore _store; [Params(64, 512, 1024)] public int Iter { get; set; } @@ -24,18 +26,35 @@ public async Task SetupAsync() _nats = new NatsConnection(); _js = new NatsJSContext(_nats); _kv = new NatsKVContext(_js); - _store = await _kv.CreateStoreAsync("benchmark"); + _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); } [Benchmark] public async ValueTask TryGetAsync() + { + var total = 0; + for (var i = 0; i < Iter; i++) + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + total++; + } + + if (total != Iter) + throw new Exception(); + + return total; + } + + [Benchmark] + public async ValueTask GetAsyncNew() { var total = 0; for (var i = 0; i < Iter; i++) { try { - await _store.GetEntryAsync("does.not.exist"); + await _store.GetEntryAsyncNew("does.not.exist"); } catch (NatsKVKeyNotFoundException) { diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 6e3ae052e..133cafa6b 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/NATS.Client.Core/NatsResult.cs b/src/NATS.Client.Core/NatsResult.cs index 3b246aa6b..90d4e61aa 100644 --- a/src/NATS.Client.Core/NatsResult.cs +++ b/src/NATS.Client.Core/NatsResult.cs @@ -29,6 +29,13 @@ public NatsResult(Exception error) public static implicit operator NatsResult(Exception error) => new(error); + [MethodImpl(MethodImplOptions.NoInlining)] + public void EnsureSuccess() + { + if (_error != null) + throw _error; + } + private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set"); private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set"); diff --git a/src/NATS.Client.KeyValueStore/NatsKVException.cs b/src/NATS.Client.KeyValueStore/NatsKVException.cs index d5d8ad36e..e89c73622 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVException.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVException.cs @@ -54,6 +54,8 @@ public NatsKVCreateException() public class NatsKVKeyNotFoundException : NatsKVException { + public static readonly NatsKVKeyNotFoundException Default = new(); + public NatsKVKeyNotFoundException() : base("Key not found") { diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index cd2a32fc9..197b30290 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -292,6 +292,141 @@ public async ValueTask> GetEntryAsync(string key, ulong revisi } } + public async ValueTask> GetEntryAsyncNew(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken); + result.EnsureSuccess(); + return result.Value; + } + + /// + public async ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + + var request = new StreamMsgGetRequest(); + var keySubject = $"$KV.{Bucket}.{key}"; + + if (revision == default) + { + request.LastBySubj = keySubject; + } + else + { + request.Seq = revision; + request.NextBySubj = keySubject; + } + + if (_stream.Info.Config.AllowDirect) + { + var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); + + if (direct is { Headers: { } headers } msg) + { + if (headers.Code == 404) + return NatsKVKeyNotFoundException.Default; + + if (!headers.TryGetLastValue(NatsSubject, out var subject)) + return new NatsKVException("Missing sequence header"); + + if (revision != default) + { + if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) + { + return new NatsKVException("Unexpected subject"); + } + } + + if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) + return new NatsKVException("Missing sequence header"); + + if (!ulong.TryParse(sequenceValue, out var sequence)) + return new NatsKVException("Can't parse sequence header"); + + if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) + return new NatsKVException("Missing timestamp header"); + + if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) + return new NatsKVException("Can't parse timestamp header"); + + var operation = NatsKVOperation.Put; + if (headers.TryGetValue(KVOperation, out var operationValues)) + { + if (operationValues.Count != 1) + return new NatsKVException("Unexpected number of operation headers"); + + if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) + return new NatsKVException("Can't parse operation header"); + } + + if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) + { + return new NatsKVKeyDeletedException(sequence); + } + + return new NatsKVEntry(Bucket, key) + { + Bucket = Bucket, + Key = key, + Created = timestamp, + Revision = sequence, + Operation = operation, + Value = msg.Data, + Delta = 0, + UsedDirectGet = true, + Error = msg.Error, + }; + } + else + { + return new NatsKVException("Missing headers"); + } + } + else + { + var response = await _stream.GetAsync(request, cancellationToken); + + if (revision != default) + { + if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) + { + return new NatsKVException("Unexpected subject"); + } + } + + T? data; + NatsDeserializeException? deserializeException = null; + if (response.Message.Data.Length > 0) + { + var buffer = new ReadOnlySequence(response.Message.Data); + + try + { + data = serializer.Deserialize(buffer); + } + catch (Exception e) + { + deserializeException = new NatsDeserializeException(buffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + + return new NatsKVEntry(Bucket, key) + { + Created = response.Message.Time, + Revision = response.Message.Seq, + Value = data, + UsedDirectGet = false, + Error = deserializeException, + }; + } + } + /// public IAsyncEnumerable> WatchAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) => WatchAsync([key], serializer, opts, cancellationToken); From 2965f3ac4cac7ba8937beeb231efa25e7de09a2d Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Dec 2024 14:55:08 +0000 Subject: [PATCH 3/9] Added perf tweaks --- sandbox/MicroBenchmark/KVBench.cs | 61 ++++++-------------- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 48 ++++++++++----- 2 files changed, 54 insertions(+), 55 deletions(-) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index 7e773d414..d4258d1f3 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -8,7 +8,6 @@ namespace MicroBenchmark; [MemoryDiagnoser] -[ShortRunJob] [PlainExporter] public class KVBench { @@ -17,9 +16,6 @@ public class KVBench private NatsKVContext _kv; private NatsKVStore _store; - [Params(64, 512, 1024)] - public int Iter { get; set; } - [GlobalSetup] public async Task SetupAsync() { @@ -32,61 +28,42 @@ public async Task SetupAsync() [Benchmark] public async ValueTask TryGetAsync() { - var total = 0; - for (var i = 0; i < Iter; i++) + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) { - var result = await _store.TryGetEntryAsync("does.not.exist"); - if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - total++; + return 1; } - if (total != Iter) - throw new Exception(); - - return total; + return 0; } [Benchmark] public async ValueTask GetAsyncNew() { - var total = 0; - for (var i = 0; i < Iter; i++) + try { - try - { - await _store.GetEntryAsyncNew("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - total++; - } + await _store.GetEntryAsyncNew("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; } - if (total != Iter) - throw new Exception(); - - return total; + return 0; } - [Benchmark] + [Benchmark(Baseline = true)] public async ValueTask GetAsync() { - var total = 0; - for (var i = 0; i < Iter; i++) + try { - try - { - await _store.GetEntryAsync("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - total++; - } + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; } - if (total != Iter) - throw new Exception(); - - return total; + return 0; } } diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index 197b30290..c326c7498 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -34,6 +34,7 @@ public enum NatsKVOperation /// public class NatsKVStore : INatsKVStore { + private const string NatsExpectedLastSubjectSequence = "Nats-Expected-Last-Subject-Sequence"; private const string KVOperation = "KV-Operation"; private const string NatsRollup = "Nats-Rollup"; @@ -44,6 +45,14 @@ public class NatsKVStore : INatsKVStore private const string NatsSequence = "Nats-Sequence"; private const string NatsTimeStamp = "Nats-Time-Stamp"; private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); + private static readonly NatsKVException MissingSequenceHeaderException = new("Missing sequence header"); + private static readonly NatsKVException MissingTimestampHeaderException = new("Missing timestamp header"); + private static readonly NatsKVException MissingHeadersException = new("Missing headers"); + private static readonly NatsKVException UnexpectedSubjectException = new("Unexpected subject"); + private static readonly NatsKVException UnexpectedNumberOfOperationHeadersException = new("Unexpected number of operation headers"); + private static readonly NatsKVException InvalidSequenceException = new("Can't parse sequence header"); + private static readonly NatsKVException InvalidTimestampException = new("Can't parse timestamp header"); + private static readonly NatsKVException InvalidOperationException = new("Can't parse operation header"); private readonly INatsJSStream _stream; internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) @@ -300,14 +309,27 @@ public async ValueTask> GetEntryAsyncNew(string key, ulong rev } /// +#if !NETSTANDARD + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] +#endif public async ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); - var request = new StreamMsgGetRequest(); +#if NET8_0_OR_GREATER + var keySubject = string.Create(key.Length + Bucket.Length + 5, (Bucket, key), static (span, state) => + { + "$KV.".CopyTo(span); + state.Bucket.CopyTo(span[4..]); + span[state.Bucket.Length + 4] = '.'; + state.key.CopyTo(span[(state.Bucket.Length + 5)..]); + }); +#else var keySubject = $"$KV.{Bucket}.{key}"; +#endif + var request = new StreamMsgGetRequest(); if (revision == default) { request.LastBySubj = keySubject; @@ -328,36 +350,36 @@ public async ValueTask>> TryGetEntryAsync(string ke return NatsKVKeyNotFoundException.Default; if (!headers.TryGetLastValue(NatsSubject, out var subject)) - return new NatsKVException("Missing sequence header"); + return MissingSequenceHeaderException; if (revision != default) { if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) { - return new NatsKVException("Unexpected subject"); + return UnexpectedSubjectException; } } if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) - return new NatsKVException("Missing sequence header"); + return MissingSequenceHeaderException; if (!ulong.TryParse(sequenceValue, out var sequence)) - return new NatsKVException("Can't parse sequence header"); + return InvalidSequenceException; if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) - return new NatsKVException("Missing timestamp header"); + return MissingTimestampHeaderException; if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) - return new NatsKVException("Can't parse timestamp header"); + return InvalidTimestampException; var operation = NatsKVOperation.Put; if (headers.TryGetValue(KVOperation, out var operationValues)) { if (operationValues.Count != 1) - return new NatsKVException("Unexpected number of operation headers"); + return UnexpectedNumberOfOperationHeadersException; if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) - return new NatsKVException("Can't parse operation header"); + return InvalidOperationException; } if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) @@ -380,7 +402,7 @@ public async ValueTask>> TryGetEntryAsync(string ke } else { - return new NatsKVException("Missing headers"); + return MissingHeadersException; } } else @@ -391,7 +413,7 @@ public async ValueTask>> TryGetEntryAsync(string ke { if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) { - return new NatsKVException("Unexpected subject"); + return UnexpectedSubjectException; } } @@ -587,12 +609,12 @@ internal async ValueTask> WatchInternalAsync(IEnumerable private static void ValidateKey(string key) { - if (string.IsNullOrWhiteSpace(key)) + if (string.IsNullOrWhiteSpace(key) || key.Length == 0) { ThrowNatsKVException("Key cannot be empty"); } - if (key.StartsWith(".") || key.EndsWith(".")) + if (key[0] == '.' || key[^1] == '.') { ThrowNatsKVException("Key cannot start or end with a period"); } From 73e5241b2475f83d7e6b877144db01db1a544d93 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Dec 2024 15:12:50 +0000 Subject: [PATCH 4/9] Format --- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index c326c7498..9b0f46eaa 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -34,7 +34,6 @@ public enum NatsKVOperation /// public class NatsKVStore : INatsKVStore { - private const string NatsExpectedLastSubjectSequence = "Nats-Expected-Last-Subject-Sequence"; private const string KVOperation = "KV-Operation"; private const string NatsRollup = "Nats-Rollup"; From 88ae07e1b26779c05cde5cd8002320a455ebd401 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Dec 2024 16:56:23 +0000 Subject: [PATCH 5/9] Add bench --- sandbox/MicroBenchmark/KVBench.cs | 49 +++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index d4258d1f3..5d489c0b2 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -66,4 +66,53 @@ public async ValueTask GetAsync() return 0; } + + [Benchmark] + public async ValueTask TryGetMultiAsync() + { + List tasks = new(); + for (var i = 0; i < 100; i++) + { + tasks.Add(Task.Run(async () => + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + { + return 1; + } + + return 0; + })); + } + + await Task.WhenAll(tasks); + + return 0; + } + + [Benchmark] + public async ValueTask GetMultiAsync() + { + List tasks = new(); + for (var i = 0; i < 100; i++) + { + tasks.Add(Task.Run(async () => + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; + } + + return 0; + })); + } + + await Task.WhenAll(tasks); + + return 0; + } } From 05b0bd6c0b0ce4a7f5ee429aa4e6d8ad4af6ad70 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Dec 2024 18:44:34 +0000 Subject: [PATCH 6/9] Add TryGetEntryAsync method to INatsKVStore interface --- src/NATS.Client.KeyValueStore/INatsKVStore.cs | 12 ++ src/NATS.Client.KeyValueStore/NatsKVStore.cs | 127 ------------------ 2 files changed, 12 insertions(+), 127 deletions(-) diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index 49b6fa40c..ecbf0d072 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -77,6 +77,18 @@ public interface INatsKVStore /// There was an error with metadata ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// + /// Try to get an entry from the bucket using the key. + /// + /// Key of the entry + /// Revision to retrieve + /// Optional serialized to override the default + /// A used to cancel the API call. + /// Serialized value type + /// A NatsResult object representing the value or an error. + /// + /// Use this method to avoid exceptions when, for example, the key is not found. + /// ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); /// diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index 9b0f46eaa..5c5072571 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -174,133 +174,6 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel /// public async ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) - { - ValidateKey(key); - serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); - - var request = new StreamMsgGetRequest(); - var keySubject = $"$KV.{Bucket}.{key}"; - - if (revision == default) - { - request.LastBySubj = keySubject; - } - else - { - request.Seq = revision; - request.NextBySubj = keySubject; - } - - if (_stream.Info.Config.AllowDirect) - { - var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); - - if (direct is { Headers: { } headers } msg) - { - if (headers.Code == 404) - throw new NatsKVKeyNotFoundException(); - - if (!headers.TryGetLastValue(NatsSubject, out var subject)) - throw new NatsKVException("Missing sequence header"); - - if (revision != default) - { - if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) - { - throw new NatsKVException("Unexpected subject"); - } - } - - if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) - throw new NatsKVException("Missing sequence header"); - - if (!ulong.TryParse(sequenceValue, out var sequence)) - throw new NatsKVException("Can't parse sequence header"); - - if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) - throw new NatsKVException("Missing timestamp header"); - - if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) - throw new NatsKVException("Can't parse timestamp header"); - - var operation = NatsKVOperation.Put; - if (headers.TryGetValue(KVOperation, out var operationValues)) - { - if (operationValues.Count != 1) - throw new NatsKVException("Unexpected number of operation headers"); - - if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) - throw new NatsKVException("Can't parse operation header"); - } - - if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) - { - throw new NatsKVKeyDeletedException(sequence); - } - - return new NatsKVEntry(Bucket, key) - { - Bucket = Bucket, - Key = key, - Created = timestamp, - Revision = sequence, - Operation = operation, - Value = msg.Data, - Delta = 0, - UsedDirectGet = true, - Error = msg.Error, - }; - } - else - { - throw new NatsKVException("Missing headers"); - } - } - else - { - var response = await _stream.GetAsync(request, cancellationToken); - - if (revision != default) - { - if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) - { - throw new NatsKVException("Unexpected subject"); - } - } - - T? data; - NatsDeserializeException? deserializeException = null; - if (response.Message.Data.Length > 0) - { - var buffer = new ReadOnlySequence(response.Message.Data); - - try - { - data = serializer.Deserialize(buffer); - } - catch (Exception e) - { - deserializeException = new NatsDeserializeException(buffer.ToArray(), e); - data = default; - } - } - else - { - data = default; - } - - return new NatsKVEntry(Bucket, key) - { - Created = response.Message.Time, - Revision = response.Message.Seq, - Value = data, - UsedDirectGet = false, - Error = deserializeException, - }; - } - } - - public async ValueTask> GetEntryAsyncNew(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken); result.EnsureSuccess(); From 384a88315f8c2f502cd486355640b894526258aa Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 5 Dec 2024 18:45:07 +0000 Subject: [PATCH 7/9] Remove obsolete GetAsyncNew method from KVBench.cs --- sandbox/MicroBenchmark/KVBench.cs | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index 5d489c0b2..0bf138731 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -37,21 +37,6 @@ public async ValueTask TryGetAsync() return 0; } - [Benchmark] - public async ValueTask GetAsyncNew() - { - try - { - await _store.GetEntryAsyncNew("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - return 1; - } - - return 0; - } - [Benchmark(Baseline = true)] public async ValueTask GetAsync() { From 99145a72041e77b56b445190519353f22b589adf Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Fri, 6 Dec 2024 12:58:25 +0000 Subject: [PATCH 8/9] Bench string --- sandbox/MicroBenchmark/KVBench.cs | 174 ++++++----- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 290 +++++++++++++++++++ 2 files changed, 396 insertions(+), 68 deletions(-) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index 0bf138731..2e291646e 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -8,6 +8,7 @@ namespace MicroBenchmark; [MemoryDiagnoser] +// [ShortRunJob] [PlainExporter] public class KVBench { @@ -25,79 +26,116 @@ public async Task SetupAsync() _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); } - [Benchmark] - public async ValueTask TryGetAsync() - { - var result = await _store.TryGetEntryAsync("does.not.exist"); - if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - { - return 1; - } - - return 0; - } + // [Benchmark(Baseline = true)] + // public async ValueTask TryGetAsync() + // { + // var result = await _store.TryGetEntryAsync("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask TryGetAsync2() + // { + // var result = await _store.TryGetEntryAsync2("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask TryGetAsync3() + // { + // var result = await _store.TryGetEntryAsync3("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // } [Benchmark(Baseline = true)] - public async ValueTask GetAsync() - { - try - { - await _store.GetEntryAsync("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - return 1; - } - - return 0; - } + public string StringOrig() => _store.StringOrig("does.not.exist"); [Benchmark] - public async ValueTask TryGetMultiAsync() - { - List tasks = new(); - for (var i = 0; i < 100; i++) - { - tasks.Add(Task.Run(async () => - { - var result = await _store.TryGetEntryAsync("does.not.exist"); - if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - { - return 1; - } - - return 0; - })); - } - - await Task.WhenAll(tasks); - - return 0; - } + public string StringInter() => _store.StringInter("does.not.exist"); [Benchmark] - public async ValueTask GetMultiAsync() - { - List tasks = new(); - for (var i = 0; i < 100; i++) - { - tasks.Add(Task.Run(async () => - { - try - { - await _store.GetEntryAsync("does.not.exist"); - } - catch (NatsKVKeyNotFoundException) - { - return 1; - } + public string StringConcat() => _store.StringConcat("does.not.exist"); - return 0; - })); - } - - await Task.WhenAll(tasks); - - return 0; - } + [Benchmark] + public string StringCreate() => _store.StringCreate("does.not.exist"); + + // + // [Benchmark(Baseline = true)] + // public async ValueTask GetAsync() + // { + // try + // { + // await _store.GetEntryAsync("does.not.exist"); + // } + // catch (NatsKVKeyNotFoundException) + // { + // return 1; + // } + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask TryGetMultiAsync() + // { + // List tasks = new(); + // for (var i = 0; i < 100; i++) + // { + // tasks.Add(Task.Run(async () => + // { + // var result = await _store.TryGetEntryAsync("does.not.exist"); + // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + // { + // return 1; + // } + // + // return 0; + // })); + // } + // + // await Task.WhenAll(tasks); + // + // return 0; + // } + // + // [Benchmark] + // public async ValueTask GetMultiAsync() + // { + // List tasks = new(); + // for (var i = 0; i < 100; i++) + // { + // tasks.Add(Task.Run(async () => + // { + // try + // { + // await _store.GetEntryAsync("does.not.exist"); + // } + // catch (NatsKVKeyNotFoundException) + // { + // return 1; + // } + // + // return 0; + // })); + // } + // + // await Task.WhenAll(tasks); + // + // return 0; + // } } diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index 5c5072571..fd32d7c8f 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -53,12 +53,14 @@ public class NatsKVStore : INatsKVStore private static readonly NatsKVException InvalidTimestampException = new("Can't parse timestamp header"); private static readonly NatsKVException InvalidOperationException = new("Can't parse operation header"); private readonly INatsJSStream _stream; + private readonly string _kvBucket; internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) { Bucket = bucket; JetStreamContext = context; _stream = stream; + _kvBucket = $"$KV.{Bucket}."; } /// @@ -321,6 +323,294 @@ public async ValueTask>> TryGetEntryAsync(string ke } } + +#if NET8_0_OR_GREATER + static void CreateKeyString(Span span, (string prefix, string key) state) + { + state.prefix.CopyTo(span); + state.key.CopyTo(span[state.prefix.Length..]); + } +#endif + +#if !NETSTANDARD + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] +#endif + public async ValueTask>> TryGetEntryAsync2(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + +#if NET8_0_OR_GREATER + var keySubject = string.Create(key.Length + _kvBucket.Length, (_kvBucket, key), CreateKeyString); +#else + var keySubject = $"{_kvBucket}{key}"; +#endif + + var request = new StreamMsgGetRequest(); + if (revision == default) + { + request.LastBySubj = keySubject; + } + else + { + request.Seq = revision; + request.NextBySubj = keySubject; + } + + if (_stream.Info.Config.AllowDirect) + { + var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); + + if (direct is { Headers: { } headers } msg) + { + if (headers.Code == 404) + return NatsKVKeyNotFoundException.Default; + + if (!headers.TryGetLastValue(NatsSubject, out var subject)) + return MissingSequenceHeaderException; + + if (revision != default) + { + if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) + return MissingSequenceHeaderException; + + if (!ulong.TryParse(sequenceValue, out var sequence)) + return InvalidSequenceException; + + if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) + return MissingTimestampHeaderException; + + if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) + return InvalidTimestampException; + + var operation = NatsKVOperation.Put; + if (headers.TryGetValue(KVOperation, out var operationValues)) + { + if (operationValues.Count != 1) + return UnexpectedNumberOfOperationHeadersException; + + if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) + return InvalidOperationException; + } + + if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) + { + return new NatsKVKeyDeletedException(sequence); + } + + return new NatsKVEntry(Bucket, key) + { + Bucket = Bucket, + Key = key, + Created = timestamp, + Revision = sequence, + Operation = operation, + Value = msg.Data, + Delta = 0, + UsedDirectGet = true, + Error = msg.Error, + }; + } + else + { + return MissingHeadersException; + } + } + else + { + var response = await _stream.GetAsync(request, cancellationToken); + + if (revision != default) + { + if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + T? data; + NatsDeserializeException? deserializeException = null; + if (response.Message.Data.Length > 0) + { + var buffer = new ReadOnlySequence(response.Message.Data); + + try + { + data = serializer.Deserialize(buffer); + } + catch (Exception e) + { + deserializeException = new NatsDeserializeException(buffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + + return new NatsKVEntry(Bucket, key) + { + Created = response.Message.Time, + Revision = response.Message.Seq, + Value = data, + UsedDirectGet = false, + Error = deserializeException, + }; + } + } + + #if !NETSTANDARD + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] +#endif + public async ValueTask>> TryGetEntryAsync3(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + ValidateKey(key); + serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + + var keySubject = $"{_kvBucket}{key}"; + + var request = new StreamMsgGetRequest(); + if (revision == default) + { + request.LastBySubj = keySubject; + } + else + { + request.Seq = revision; + request.NextBySubj = keySubject; + } + + if (_stream.Info.Config.AllowDirect) + { + var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); + + if (direct is { Headers: { } headers } msg) + { + if (headers.Code == 404) + return NatsKVKeyNotFoundException.Default; + + if (!headers.TryGetLastValue(NatsSubject, out var subject)) + return MissingSequenceHeaderException; + + if (revision != default) + { + if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) + return MissingSequenceHeaderException; + + if (!ulong.TryParse(sequenceValue, out var sequence)) + return InvalidSequenceException; + + if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) + return MissingTimestampHeaderException; + + if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) + return InvalidTimestampException; + + var operation = NatsKVOperation.Put; + if (headers.TryGetValue(KVOperation, out var operationValues)) + { + if (operationValues.Count != 1) + return UnexpectedNumberOfOperationHeadersException; + + if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) + return InvalidOperationException; + } + + if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) + { + return new NatsKVKeyDeletedException(sequence); + } + + return new NatsKVEntry(Bucket, key) + { + Bucket = Bucket, + Key = key, + Created = timestamp, + Revision = sequence, + Operation = operation, + Value = msg.Data, + Delta = 0, + UsedDirectGet = true, + Error = msg.Error, + }; + } + else + { + return MissingHeadersException; + } + } + else + { + var response = await _stream.GetAsync(request, cancellationToken); + + if (revision != default) + { + if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) + { + return UnexpectedSubjectException; + } + } + + T? data; + NatsDeserializeException? deserializeException = null; + if (response.Message.Data.Length > 0) + { + var buffer = new ReadOnlySequence(response.Message.Data); + + try + { + data = serializer.Deserialize(buffer); + } + catch (Exception e) + { + deserializeException = new NatsDeserializeException(buffer.ToArray(), e); + data = default; + } + } + else + { + data = default; + } + + return new NatsKVEntry(Bucket, key) + { + Created = response.Message.Time, + Revision = response.Message.Seq, + Value = data, + UsedDirectGet = false, + Error = deserializeException, + }; + } + } + + public string StringOrig(string key) => $"$KV.{Bucket}.{key}"; + + public string StringInter(string key) => $"{_kvBucket}{key}"; + + public string StringConcat(string key) => _kvBucket + key; + + public string StringCreate(string key) + { +#if NET8_0_OR_GREATER + return string.Create(key.Length + _kvBucket.Length, (_kvBucket, key), CreateKeyString); +#else + throw new NotImplementedException(); +#endif + } + /// public IAsyncEnumerable> WatchAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) => WatchAsync([key], serializer, opts, cancellationToken); From 9674eedf09f2706a7252341cc4da24270f66136a Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Sat, 7 Dec 2024 13:05:39 +0000 Subject: [PATCH 9/9] Tidy up --- sandbox/MicroBenchmark/KVBench.cs | 178 +++++------ src/NATS.Client.Core/NatsResult.cs | 7 - src/NATS.Client.KeyValueStore/NatsKVStore.cs | 310 +------------------ 3 files changed, 79 insertions(+), 416 deletions(-) diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs index 2e291646e..60b981988 100644 --- a/sandbox/MicroBenchmark/KVBench.cs +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -8,9 +8,8 @@ namespace MicroBenchmark; [MemoryDiagnoser] -// [ShortRunJob] [PlainExporter] -public class KVBench +public class KvBench { private NatsConnection _nats; private NatsJSContext _js; @@ -26,116 +25,79 @@ public async Task SetupAsync() _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); } - // [Benchmark(Baseline = true)] - // public async ValueTask TryGetAsync() - // { - // var result = await _store.TryGetEntryAsync("does.not.exist"); - // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - // { - // return 1; - // } - // - // return 0; - // } - // - // [Benchmark] - // public async ValueTask TryGetAsync2() - // { - // var result = await _store.TryGetEntryAsync2("does.not.exist"); - // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - // { - // return 1; - // } - // - // return 0; - // } - // - // [Benchmark] - // public async ValueTask TryGetAsync3() - // { - // var result = await _store.TryGetEntryAsync3("does.not.exist"); - // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - // { - // return 1; - // } - // - // return 0; - // } - - [Benchmark(Baseline = true)] - public string StringOrig() => _store.StringOrig("does.not.exist"); + [Benchmark] + public async ValueTask TryGetAsync() + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + { + return 1; + } + + return 0; + } [Benchmark] - public string StringInter() => _store.StringInter("does.not.exist"); + public async ValueTask GetAsync() + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; + } + + return 0; + } [Benchmark] - public string StringConcat() => _store.StringConcat("does.not.exist"); + public async ValueTask TryGetMultiAsync() + { + List tasks = new(); + for (var i = 0; i < 100; i++) + { + tasks.Add(Task.Run(async () => + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + { + return 1; + } + + return 0; + })); + } + + await Task.WhenAll(tasks); + + return 0; + } [Benchmark] - public string StringCreate() => _store.StringCreate("does.not.exist"); - - // - // [Benchmark(Baseline = true)] - // public async ValueTask GetAsync() - // { - // try - // { - // await _store.GetEntryAsync("does.not.exist"); - // } - // catch (NatsKVKeyNotFoundException) - // { - // return 1; - // } - // - // return 0; - // } - // - // [Benchmark] - // public async ValueTask TryGetMultiAsync() - // { - // List tasks = new(); - // for (var i = 0; i < 100; i++) - // { - // tasks.Add(Task.Run(async () => - // { - // var result = await _store.TryGetEntryAsync("does.not.exist"); - // if (result is { Success: false, Error: NatsKVKeyNotFoundException }) - // { - // return 1; - // } - // - // return 0; - // })); - // } - // - // await Task.WhenAll(tasks); - // - // return 0; - // } - // - // [Benchmark] - // public async ValueTask GetMultiAsync() - // { - // List tasks = new(); - // for (var i = 0; i < 100; i++) - // { - // tasks.Add(Task.Run(async () => - // { - // try - // { - // await _store.GetEntryAsync("does.not.exist"); - // } - // catch (NatsKVKeyNotFoundException) - // { - // return 1; - // } - // - // return 0; - // })); - // } - // - // await Task.WhenAll(tasks); - // - // return 0; - // } + public async ValueTask GetMultiAsync() + { + List tasks = new(); + for (var i = 0; i < 100; i++) + { + tasks.Add(Task.Run(async () => + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; + } + + return 0; + })); + } + + await Task.WhenAll(tasks); + + return 0; + } } diff --git a/src/NATS.Client.Core/NatsResult.cs b/src/NATS.Client.Core/NatsResult.cs index 90d4e61aa..3b246aa6b 100644 --- a/src/NATS.Client.Core/NatsResult.cs +++ b/src/NATS.Client.Core/NatsResult.cs @@ -29,13 +29,6 @@ public NatsResult(Exception error) public static implicit operator NatsResult(Exception error) => new(error); - [MethodImpl(MethodImplOptions.NoInlining)] - public void EnsureSuccess() - { - if (_error != null) - throw _error; - } - private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set"); private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set"); diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index fd32d7c8f..cd2717fc3 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -178,303 +178,23 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel public async ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken); - result.EnsureSuccess(); - return result.Value; - } - - /// -#if !NETSTANDARD - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] -#endif - public async ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) - { - ValidateKey(key); - serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); - -#if NET8_0_OR_GREATER - var keySubject = string.Create(key.Length + Bucket.Length + 5, (Bucket, key), static (span, state) => - { - "$KV.".CopyTo(span); - state.Bucket.CopyTo(span[4..]); - span[state.Bucket.Length + 4] = '.'; - state.key.CopyTo(span[(state.Bucket.Length + 5)..]); - }); -#else - var keySubject = $"$KV.{Bucket}.{key}"; -#endif - - var request = new StreamMsgGetRequest(); - if (revision == default) - { - request.LastBySubj = keySubject; - } - else - { - request.Seq = revision; - request.NextBySubj = keySubject; - } - - if (_stream.Info.Config.AllowDirect) - { - var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); - - if (direct is { Headers: { } headers } msg) - { - if (headers.Code == 404) - return NatsKVKeyNotFoundException.Default; - - if (!headers.TryGetLastValue(NatsSubject, out var subject)) - return MissingSequenceHeaderException; - - if (revision != default) - { - if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) - { - return UnexpectedSubjectException; - } - } - - if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) - return MissingSequenceHeaderException; - - if (!ulong.TryParse(sequenceValue, out var sequence)) - return InvalidSequenceException; - - if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) - return MissingTimestampHeaderException; - - if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) - return InvalidTimestampException; - - var operation = NatsKVOperation.Put; - if (headers.TryGetValue(KVOperation, out var operationValues)) - { - if (operationValues.Count != 1) - return UnexpectedNumberOfOperationHeadersException; - - if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) - return InvalidOperationException; - } - - if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) - { - return new NatsKVKeyDeletedException(sequence); - } - - return new NatsKVEntry(Bucket, key) - { - Bucket = Bucket, - Key = key, - Created = timestamp, - Revision = sequence, - Operation = operation, - Value = msg.Data, - Delta = 0, - UsedDirectGet = true, - Error = msg.Error, - }; - } - else - { - return MissingHeadersException; - } - } - else + if (!result.Success) { - var response = await _stream.GetAsync(request, cancellationToken); - - if (revision != default) - { - if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) - { - return UnexpectedSubjectException; - } - } - - T? data; - NatsDeserializeException? deserializeException = null; - if (response.Message.Data.Length > 0) - { - var buffer = new ReadOnlySequence(response.Message.Data); - - try - { - data = serializer.Deserialize(buffer); - } - catch (Exception e) - { - deserializeException = new NatsDeserializeException(buffer.ToArray(), e); - data = default; - } - } - else - { - data = default; - } - - return new NatsKVEntry(Bucket, key) - { - Created = response.Message.Time, - Revision = response.Message.Seq, - Value = data, - UsedDirectGet = false, - Error = deserializeException, - }; + ThrowException(result.Error); } - } - -#if NET8_0_OR_GREATER - static void CreateKeyString(Span span, (string prefix, string key) state) - { - state.prefix.CopyTo(span); - state.key.CopyTo(span[state.prefix.Length..]); + return result.Value; } -#endif + /// #if !NETSTANDARD [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] #endif - public async ValueTask>> TryGetEntryAsync2(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) - { - ValidateKey(key); - serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); - -#if NET8_0_OR_GREATER - var keySubject = string.Create(key.Length + _kvBucket.Length, (_kvBucket, key), CreateKeyString); -#else - var keySubject = $"{_kvBucket}{key}"; -#endif - - var request = new StreamMsgGetRequest(); - if (revision == default) - { - request.LastBySubj = keySubject; - } - else - { - request.Seq = revision; - request.NextBySubj = keySubject; - } - - if (_stream.Info.Config.AllowDirect) - { - var direct = await _stream.GetDirectAsync(request, serializer, cancellationToken); - - if (direct is { Headers: { } headers } msg) - { - if (headers.Code == 404) - return NatsKVKeyNotFoundException.Default; - - if (!headers.TryGetLastValue(NatsSubject, out var subject)) - return MissingSequenceHeaderException; - - if (revision != default) - { - if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) - { - return UnexpectedSubjectException; - } - } - - if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) - return MissingSequenceHeaderException; - - if (!ulong.TryParse(sequenceValue, out var sequence)) - return InvalidSequenceException; - - if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) - return MissingTimestampHeaderException; - - if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) - return InvalidTimestampException; - - var operation = NatsKVOperation.Put; - if (headers.TryGetValue(KVOperation, out var operationValues)) - { - if (operationValues.Count != 1) - return UnexpectedNumberOfOperationHeadersException; - - if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) - return InvalidOperationException; - } - - if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) - { - return new NatsKVKeyDeletedException(sequence); - } - - return new NatsKVEntry(Bucket, key) - { - Bucket = Bucket, - Key = key, - Created = timestamp, - Revision = sequence, - Operation = operation, - Value = msg.Data, - Delta = 0, - UsedDirectGet = true, - Error = msg.Error, - }; - } - else - { - return MissingHeadersException; - } - } - else - { - var response = await _stream.GetAsync(request, cancellationToken); - - if (revision != default) - { - if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) - { - return UnexpectedSubjectException; - } - } - - T? data; - NatsDeserializeException? deserializeException = null; - if (response.Message.Data.Length > 0) - { - var buffer = new ReadOnlySequence(response.Message.Data); - - try - { - data = serializer.Deserialize(buffer); - } - catch (Exception e) - { - deserializeException = new NatsDeserializeException(buffer.ToArray(), e); - data = default; - } - } - else - { - data = default; - } - - return new NatsKVEntry(Bucket, key) - { - Created = response.Message.Time, - Revision = response.Message.Seq, - Value = data, - UsedDirectGet = false, - Error = deserializeException, - }; - } - } - - #if !NETSTANDARD - [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] -#endif - public async ValueTask>> TryGetEntryAsync3(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + public async ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); - - var keySubject = $"{_kvBucket}{key}"; + var keySubject = _kvBucket + key; var request = new StreamMsgGetRequest(); if (revision == default) @@ -596,21 +316,6 @@ public async ValueTask>> TryGetEntryAsync3(string k } } - public string StringOrig(string key) => $"$KV.{Bucket}.{key}"; - - public string StringInter(string key) => $"{_kvBucket}{key}"; - - public string StringConcat(string key) => _kvBucket + key; - - public string StringCreate(string key) - { -#if NET8_0_OR_GREATER - return string.Create(key.Length + _kvBucket.Length, (_kvBucket, key), CreateKeyString); -#else - throw new NotImplementedException(); -#endif - } - /// public IAsyncEnumerable> WatchAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default) => WatchAsync([key], serializer, opts, cancellationToken); @@ -789,6 +494,9 @@ private static void ValidateKey(string key) [MethodImpl(MethodImplOptions.NoInlining)] private static void ThrowNatsKVException(string message) => throw new NatsKVException(message); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowException(Exception exception) => throw exception; } public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);