Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add KV try get #688

Merged
merged 9 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions sandbox/MicroBenchmark/KVBench.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
using BenchmarkDotNet.Attributes;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;

#pragma warning disable CS8618

namespace MicroBenchmark;

[MemoryDiagnoser]
[PlainExporter]
public class KvBench
{
private NatsConnection _nats;
private NatsJSContext _js;
private NatsKVContext _kv;
private NatsKVStore _store;

[GlobalSetup]
public async Task SetupAsync()
{
_nats = new NatsConnection();
_js = new NatsJSContext(_nats);
_kv = new NatsKVContext(_js);
_store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark"));
}

[Benchmark]
public async ValueTask<int> TryGetAsync()
{
var result = await _store.TryGetEntryAsync<int>("does.not.exist");
if (result is { Success: false, Error: NatsKVKeyNotFoundException })
{
return 1;
}

return 0;
}

[Benchmark]
public async ValueTask<int> GetAsync()
{
try
{
await _store.GetEntryAsync<int>("does.not.exist");
}
catch (NatsKVKeyNotFoundException)
{
return 1;
}

return 0;
}

[Benchmark]
public async ValueTask<int> TryGetMultiAsync()
{
List<Task> tasks = new();
for (var i = 0; i < 100; i++)
{
tasks.Add(Task.Run(async () =>
{
var result = await _store.TryGetEntryAsync<int>("does.not.exist");
if (result is { Success: false, Error: NatsKVKeyNotFoundException })
{
return 1;
}

return 0;
}));
}

await Task.WhenAll(tasks);

return 0;
}

[Benchmark]
public async ValueTask<int> GetMultiAsync()
{
List<Task> tasks = new();
for (var i = 0; i < 100; i++)
{
tasks.Add(Task.Run(async () =>
{
try
{
await _store.GetEntryAsync<int>("does.not.exist");
}
catch (NatsKVKeyNotFoundException)
{
return 1;
}

return 0;
}));
}

await Task.WhenAll(tasks);

return 0;
}
}
3 changes: 2 additions & 1 deletion sandbox/MicroBenchmark/MicroBenchmark.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.13.12" />
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
<PackageReference Include="StackExchange.Redis" Version="2.5.43" />
<PackageReference Include="ZLogger" Version="1.6.1" />
<PackageReference Include="NATS.Client" Version="0.14.5" />
Expand All @@ -20,6 +20,7 @@
<ItemGroup>
<ProjectReference Include="..\..\src\NATS.Client.Core\NATS.Client.Core.csproj" />
<ProjectReference Include="..\..\src\NATS.Client.JetStream\NATS.Client.JetStream.csproj" />
<ProjectReference Include="..\..\src\NATS.Client.KeyValueStore\NATS.Client.KeyValueStore.csproj" />
</ItemGroup>

</Project>
14 changes: 14 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public interface INatsKVStore
/// <exception cref="NatsKVException">There was an error with metadata</exception>
ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Try to get an entry from the bucket using the key.
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="revision">Revision to retrieve</param>
/// <param name="serializer">Optional serialized to override the default</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>A NatsResult object representing the value or an error.</returns>
/// <remarks>
/// Use this method to avoid exceptions when, for example, the key is not found.
/// </remarks>
ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Start a watcher for specific keys
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public NatsKVCreateException()

public class NatsKVKeyNotFoundException : NatsKVException
{
public static readonly NatsKVKeyNotFoundException Default = new();

public NatsKVKeyNotFoundException()
: base("Key not found")
{
Expand Down
59 changes: 43 additions & 16 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,23 @@ public class NatsKVStore : INatsKVStore
private const string NatsSequence = "Nats-Sequence";
private const string NatsTimeStamp = "Nats-Time-Stamp";
private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
mtmk marked this conversation as resolved.
Show resolved Hide resolved
private static readonly NatsKVException MissingSequenceHeaderException = new("Missing sequence header");
private static readonly NatsKVException MissingTimestampHeaderException = new("Missing timestamp header");
private static readonly NatsKVException MissingHeadersException = new("Missing headers");
private static readonly NatsKVException UnexpectedSubjectException = new("Unexpected subject");
private static readonly NatsKVException UnexpectedNumberOfOperationHeadersException = new("Unexpected number of operation headers");
private static readonly NatsKVException InvalidSequenceException = new("Can't parse sequence header");
private static readonly NatsKVException InvalidTimestampException = new("Can't parse timestamp header");
private static readonly NatsKVException InvalidOperationException = new("Can't parse operation header");
private readonly INatsJSStream _stream;
private readonly string _kvBucket;

internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream)
{
Bucket = bucket;
JetStreamContext = context;
_stream = stream;
_kvBucket = $"$KV.{Bucket}.";
}

/// <inheritdoc />
Expand Down Expand Up @@ -166,13 +176,27 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel

/// <inheritdoc />
public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken);
if (!result.Success)
{
ThrowException(result.Error);
}
mtmk marked this conversation as resolved.
Show resolved Hide resolved

return result.Value;
}

/// <inheritdoc />
#if !NETSTANDARD
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
#endif
public async ValueTask<NatsResult<NatsKVEntry<T>>> TryGetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default)
{
ValidateKey(key);
serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer<T>();
var keySubject = _kvBucket + key;

var request = new StreamMsgGetRequest();
var keySubject = $"$KV.{Bucket}.{key}";

if (revision == default)
{
request.LastBySubj = keySubject;
Expand All @@ -190,44 +214,44 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
if (direct is { Headers: { } headers } msg)
{
if (headers.Code == 404)
mtmk marked this conversation as resolved.
Show resolved Hide resolved
throw new NatsKVKeyNotFoundException();
return NatsKVKeyNotFoundException.Default;

if (!headers.TryGetLastValue(NatsSubject, out var subject))
throw new NatsKVException("Missing sequence header");
return MissingSequenceHeaderException;

if (revision != default)
{
if (!string.Equals(subject, keySubject, StringComparison.Ordinal))
{
throw new NatsKVException("Unexpected subject");
return UnexpectedSubjectException;
}
}

if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue))
throw new NatsKVException("Missing sequence header");
return MissingSequenceHeaderException;

if (!ulong.TryParse(sequenceValue, out var sequence))
throw new NatsKVException("Can't parse sequence header");
return InvalidSequenceException;

if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue))
throw new NatsKVException("Missing timestamp header");
return MissingTimestampHeaderException;

if (!DateTimeOffset.TryParse(timestampValue, out var timestamp))
throw new NatsKVException("Can't parse timestamp header");
return InvalidTimestampException;

var operation = NatsKVOperation.Put;
if (headers.TryGetValue(KVOperation, out var operationValues))
{
if (operationValues.Count != 1)
throw new NatsKVException("Unexpected number of operation headers");
return UnexpectedNumberOfOperationHeadersException;

if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation))
throw new NatsKVException("Can't parse operation header");
return InvalidOperationException;
}

if (operation is NatsKVOperation.Del or NatsKVOperation.Purge)
{
throw new NatsKVKeyDeletedException(sequence);
return new NatsKVKeyDeletedException(sequence);
}

return new NatsKVEntry<T>(Bucket, key)
Expand All @@ -245,7 +269,7 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
}
else
{
throw new NatsKVException("Missing headers");
return MissingHeadersException;
}
}
else
Expand All @@ -256,7 +280,7 @@ public async ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revisi
{
if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal))
{
throw new NatsKVException("Unexpected subject");
return UnexpectedSubjectException;
}
}

Expand Down Expand Up @@ -452,12 +476,12 @@ internal async ValueTask<NatsKVWatcher<T>> WatchInternalAsync<T>(IEnumerable<str
/// </summary>
private static void ValidateKey(string key)
{
if (string.IsNullOrWhiteSpace(key))
if (string.IsNullOrWhiteSpace(key) || key.Length == 0)
{
ThrowNatsKVException("Key cannot be empty");
}

if (key.StartsWith(".") || key.EndsWith("."))
if (key[0] == '.' || key[^1] == '.')
{
ThrowNatsKVException("Key cannot start or end with a period");
}
Expand All @@ -470,6 +494,9 @@ private static void ValidateKey(string key)

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowNatsKVException(string message) => throw new NatsKVException(message);

[MethodImpl(MethodImplOptions.NoInlining)]
private static void ThrowException(Exception exception) => throw exception;
}

public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);
Loading