Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KV try get #688

Merged
merged 9 commits into from
Dec 10, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Try-get implementation
  • Loading branch information
mtmk committed Dec 4, 2024
commit 9b4d49bfa4a51f90bf9c85358a295761f0542785
27 changes: 23 additions & 4 deletions sandbox/MicroBenchmark/KVBench.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Attributes;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

#pragma warning disable CS8618

namespace MicroBenchmark;

[MemoryDiagnoser]
@@ -13,7 +15,7 @@ public class KVBench
private NatsConnection _nats;
private NatsJSContext _js;
private NatsKVContext _kv;
private INatsKVStore _store;
private NatsKVStore _store;

[Params(64, 512, 1024)]
public int Iter { get; set; }
@@ -24,18 +26,35 @@ public async Task SetupAsync()
_nats = new NatsConnection();
_js = new NatsJSContext(_nats);
_kv = new NatsKVContext(_js);
_store = await _kv.CreateStoreAsync("benchmark");
_store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark"));
}

[Benchmark]
public async ValueTask<int> TryGetAsync()
{
var total = 0;
for (var i = 0; i < Iter; i++)
{
var result = await _store.TryGetEntryAsync<int>("does.not.exist");
if (result is { Success: false, Error: NatsKVKeyNotFoundException })
total++;
}

if (total != Iter)
throw new Exception();

return total;
}

[Benchmark]
public async ValueTask<int> GetAsyncNew()
{
var total = 0;
for (var i = 0; i < Iter; i++)
{
try
{
await _store.GetEntryAsync<int>("does.not.exist");
await _store.GetEntryAsyncNew<int>("does.not.exist");
}
catch (NatsKVKeyNotFoundException)
{
2 changes: 1 addition & 1 deletion sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.12" />
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="StackExchange.Redis" Version="2.5.43" />
<PackageReference Include="ZLogger" Version="1.6.1" />
<PackageReference Include="NATS.Client" Version="0.14.5" />
7 changes: 7 additions & 0 deletions src/NATS.Client.Core/NatsResult.cs
Original file line number Diff line number Diff line change
@@ -29,6 +29,13 @@ public NatsResult(Exception error)

public static implicit operator NatsResult<T>(Exception error) => new(error);

[MethodImpl(MethodImplOptions.NoInlining)]
public void EnsureSuccess()
{
if (_error != null)
throw _error;
mtmk marked this conversation as resolved.
Show resolved Hide resolved
}

private static T ThrowValueIsNotSetException() => throw CreateInvalidOperationException("Result value is not set");

private static Exception ThrowErrorIsNotSetException() => throw CreateInvalidOperationException("Result error is not set");
2 changes: 2 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVException.cs
Original file line number Diff line number Diff line change
@@ -54,6 +54,8 @@ public NatsKVCreateException()

public class NatsKVKeyNotFoundException : NatsKVException
{
public static readonly NatsKVKeyNotFoundException Default = new();

public NatsKVKeyNotFoundException()
: base("Key not found")
{
135 changes: 135 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
@@ -292,6 +292,141 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
}
}

public async ValueTask<NatsKVEntry<T>> GetEntryAsyncNew<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken);
result.EnsureSuccess();
mtmk marked this conversation as resolved.
Show resolved Hide resolved
return result.Value;
}

/// <inheritdoc />
public async ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer<T>();

var request = new StreamMsgGetRequest();
var keySubject = $"$KV.{Bucket}.{key}";
mtmk marked this conversation as resolved.
Show resolved Hide resolved

if (revision == default)
{
request.LastBySubj = keySubject;
}
else
{
request.Seq = revision;
request.NextBySubj = keySubject;
}

if (_stream.Info.Config.AllowDirect)
{
var direct = await _stream.GetDirectAsync<T>(request, serializer, cancellationToken);

if (direct is { Headers: { } headers } msg)
{
if (headers.Code == 404)
mtmk marked this conversation as resolved.
Show resolved Hide resolved
return NatsKVKeyNotFoundException.Default;

if (!headers.TryGetLastValue(NatsSubject, out var subject))
return new NatsKVException("Missing sequence header");

if (revision != default)
{
if (!string.Equals(subject, keySubject, StringComparison.Ordinal))
{
return new NatsKVException("Unexpected subject");
}
}

if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue))
return new NatsKVException("Missing sequence header");

if (!ulong.TryParse(sequenceValue, out var sequence))
return new NatsKVException("Can't parse sequence header");

if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue))
return new NatsKVException("Missing timestamp header");

if (!DateTimeOffset.TryParse(timestampValue, out var timestamp))
return new NatsKVException("Can't parse timestamp header");

var operation = NatsKVOperation.Put;
if (headers.TryGetValue(KVOperation, out var operationValues))
{
if (operationValues.Count != 1)
return new NatsKVException("Unexpected number of operation headers");

if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation))
return new NatsKVException("Can't parse operation header");
}

if (operation is NatsKVOperation.Del or NatsKVOperation.Purge)
{
return new NatsKVKeyDeletedException(sequence);
}

return new NatsKVEntry<T>(Bucket, key)
{
Bucket = Bucket,
Key = key,
Created = timestamp,
Revision = sequence,
Operation = operation,
Value = msg.Data,
Delta = 0,
UsedDirectGet = true,
Error = msg.Error,
};
}
else
{
return new NatsKVException("Missing headers");
}
}
else
{
var response = await _stream.GetAsync(request, cancellationToken);

if (revision != default)
{
if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal))
{
return new NatsKVException("Unexpected subject");
}
}

T? data;
NatsDeserializeException? deserializeException = null;
if (response.Message.Data.Length > 0)
{
var buffer = new ReadOnlySequence<byte>(response.Message.Data);

try
{
data = serializer.Deserialize(buffer);
}
catch (Exception e)
{
deserializeException = new NatsDeserializeException(buffer.ToArray(), e);
data = default;
}
}
else
{
data = default;
}

return new NatsKVEntry<T>(Bucket, key)
{
Created = response.Message.Time,
Revision = response.Message.Seq,
Value = data,
UsedDirectGet = false,
Error = deserializeException,
};
}
}

/// <inheritdoc />
public IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default)
=> WatchAsync<T>([key], serializer, opts, cancellationToken);
Loading