diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs index 07aaadc1f..7fcb73dd7 100644 --- a/src/NATS.Client.ObjectStore/Internal/Encoder.cs +++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Security.Cryptography; +using NATS.Client.Core; namespace NATS.Client.ObjectStore.Internal; @@ -68,18 +69,33 @@ public static string Encode(string arg) /// 'inArray' is null. /// offset or length is negative OR offset plus length is greater than the length of inArray. public static string Encode(Span inArray, bool raw = false) + { + using (var owner = EncodeToMemoryOwner(inArray, raw)) + { + var segment = owner.DangerousGetArray(); + if (segment.Array == null || segment.Array.Length == 0) + { + return string.Empty; + } + + return new string(segment.Array, segment.Offset, segment.Count); + } + } + + public static NatsMemoryOwner EncodeToMemoryOwner(Span inArray, bool raw = false) { var offset = 0; var length = inArray.Length; if (length == 0) - return string.Empty; + return NatsMemoryOwner.Empty; var lengthMod3 = length % 3; var limit = length - lengthMod3; - var output = new char[(length + 2) / 3 * 4]; + var owner = NatsMemoryOwner.Allocate((length + 2) / 3 * 4); var table = SBase64Table; int i, j = 0; + var output = owner.Span; // takes 3 bytes from inArray and insert 4 bytes into output for (i = offset; i < limit; i += 3) @@ -128,14 +144,14 @@ public static string Encode(Span inArray, bool raw = false) } if (raw) - return new string(output, 0, j); + return owner.Slice(0, j); for (var k = j; k < output.Length; k++) { output[k] = Base64PadCharacter; } - return new string(output); + return owner; } /// diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 0e5163c6d..479c43718 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -94,65 +94,74 @@ public async ValueTask GetAsync(string key, Stream stream, bool pushConsumer.Init(); - string digest; - var chunks = 0; - var size = 0; - using (var sha256 = SHA256.Create()) + var digest = NatsMemoryOwner.Empty; + try { + var chunks = 0; + var size = 0; + using (var sha256 = SHA256.Create()) + { #if NETSTANDARD2_0 - using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) + using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) #else - await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen)) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen)) #endif - { - await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { - // We have to make sure to carry on consuming the channel to avoid any blocking: - // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally - // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level - // and signal the server that we are a slow consumer); then when we make an request-reply API call to - // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to - // keep consuming the channel to avoid this. - if (pushConsumer.IsDone) - continue; - - if (msg.Data.Length > 0) + await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { - using var memoryOwner = msg.Data; - chunks++; - size += memoryOwner.Memory.Length; + // We have to make sure to carry on consuming the channel to avoid any blocking: + // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally + // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level + // and signal the server that we are a slow consumer); then when we make an request-reply API call to + // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to + // keep consuming the channel to avoid this. + if (pushConsumer.IsDone) + continue; + + if (msg.Data.Length > 0) + { + using var memoryOwner = msg.Data; + chunks++; + size += memoryOwner.Memory.Length; #if NETSTANDARD2_0 - var segment = memoryOwner.DangerousGetArray(); - await hashedStream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken); + var segment = memoryOwner.DangerousGetArray(); + await hashedStream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken); #else - await hashedStream.WriteAsync(memoryOwner.Memory, cancellationToken); + await hashedStream.WriteAsync(memoryOwner.Memory, cancellationToken); #endif - } + } - var p = msg.Metadata?.NumPending; - if (p is 0) - { - pushConsumer.Done(); + var p = msg.Metadata?.NumPending; + if (p is 0) + { + pushConsumer.Done(); + } } } + + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); } - digest = Base64UrlEncoder.Encode(sha256.Hash); - } + if (info.Digest == null + || info.Digest.StartsWith("SHA-256=") == false + || info.Digest.AsSpan().Slice("SHA-256=".Length).SequenceEqual(digest.Span) == false) + { + throw new NatsObjException("SHA-256 digest mismatch"); + } - if ($"SHA-256={digest}" != info.Digest) - { - throw new NatsObjException("SHA-256 digest mismatch"); - } + if (chunks != info.Chunks) + { + throw new NatsObjException("Chunks mismatch"); + } - if (chunks != info.Chunks) - { - throw new NatsObjException("Chunks mismatch"); + if (size != info.Size) + { + throw new NatsObjException("Size mismatch"); + } } - - if (size != info.Size) + finally { - throw new NatsObjException("Size mismatch"); + digest.Dispose(); } return info; @@ -223,117 +232,121 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var chunks = 0; var chunkSize = meta.Options.MaxChunkSize.Value; - string digest; - using (var sha256 = SHA256.Create()) + var digest = NatsMemoryOwner.Empty; + try { + using (var sha256 = SHA256.Create()) + { #if NETSTANDARD2_0 - using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) + using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) #else - await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen)) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen)) #endif - { - while (true) { - var memoryOwner = NatsMemoryOwner.Allocate(chunkSize); - - var memory = memoryOwner.Memory; - var currentChunkSize = 0; - var eof = false; - - // Fill a chunk while (true) { -#if NETSTANDARD2_0 - int read; - if (MemoryMarshal.TryGetArray((ReadOnlyMemory)memory, out var segment) == false) - { - read = await hashedStream.ReadAsync(segment.Array!, segment.Offset, segment.Count, cancellationToken); - } - else + var memoryOwner = NatsMemoryOwner.Allocate(chunkSize); + + var memory = memoryOwner.Memory; + var currentChunkSize = 0; + var eof = false; + + // Fill a chunk + while (true) { - var bytes = ArrayPool.Shared.Rent(memory.Length); - try +#if NETSTANDARD2_0 + int read; + if (MemoryMarshal.TryGetArray((ReadOnlyMemory)memory, out var segment) == false) { - segment = new ArraySegment(bytes, 0, memory.Length); read = await hashedStream.ReadAsync(segment.Array!, segment.Offset, segment.Count, cancellationToken); - segment.Array.AsMemory(0, read).CopyTo(memory); } - finally + else { - ArrayPool.Shared.Return(bytes); + var bytes = ArrayPool.Shared.Rent(memory.Length); + try + { + segment = new ArraySegment(bytes, 0, memory.Length); + read = await hashedStream.ReadAsync(segment.Array!, segment.Offset, segment.Count, cancellationToken); + segment.Array.AsMemory(0, read).CopyTo(memory); + } + finally + { + ArrayPool.Shared.Return(bytes); + } } - } #else - var read = await hashedStream.ReadAsync(memory, cancellationToken); + var read = await hashedStream.ReadAsync(memory, cancellationToken); #endif - // End of stream - if (read == 0) - { - eof = true; - break; - } + // End of stream + if (read == 0) + { + eof = true; + break; + } - memory = memory.Slice(read); - currentChunkSize += read; + memory = memory.Slice(read); + currentChunkSize += read; - // Chunk filled - if (memory.IsEmpty) - { - break; + // Chunk filled + if (memory.IsEmpty) + { + break; + } } - } - if (currentChunkSize > 0) - { - size += currentChunkSize; - chunks++; - } + if (currentChunkSize > 0) + { + size += currentChunkSize; + chunks++; + } - var buffer = memoryOwner.Slice(0, currentChunkSize); + var buffer = memoryOwner.Slice(0, currentChunkSize); - // Chunks - var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); - ack.EnsureSuccess(); + // Chunks + var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); + ack.EnsureSuccess(); - if (eof) - break; + if (eof) + break; + } } - } - if (sha256.Hash == null) - throw new NatsObjException("Can't compute SHA256 hash"); + if (sha256.Hash == null) + throw new NatsObjException("Can't compute SHA256 hash"); - digest = Base64UrlEncoder.Encode(sha256.Hash); - } + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); + } - meta.Chunks = chunks; - meta.Size = size; - meta.Digest = $"SHA-256={digest}"; + meta.Chunks = chunks; + meta.Size = size; + meta.Digest = $"SHA-256={digest}"; - // Metadata - await PublishMeta(meta, cancellationToken); + // Metadata + await PublishMeta(meta, cancellationToken); - // Delete the old object - if (info?.Nuid != null && info.Nuid != nuid) - { - try - { - await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", - request: new StreamPurgeRequest - { - Filter = GetChunkSubject(info.Nuid), - }, - cancellationToken); - } - catch (NatsJSApiException e) + // Delete the old object + if (info?.Nuid != null && info.Nuid != nuid) { - if (e.Error.Code != 404) - throw; + try + { + await _context.JSRequestResponseAsync( + subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", + request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid), }, + cancellationToken); + } + catch (NatsJSApiException e) + { + if (e.Error.Code != 404) + throw; + } } } + finally + { + digest.Dispose(); + } return meta; }