From a28575dc09baf08a20c662bd1c9757473e91d90b Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Mon, 15 Jul 2024 11:25:27 -0400 Subject: [PATCH 1/4] Utilize NatsMemoryOwner for interim parsing --- .../Internal/Encoder.cs | 55 ++-- src/NATS.Client.ObjectStore/NatsObjStore.cs | 244 +++++++++--------- 2 files changed, 164 insertions(+), 135 deletions(-) diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs index 07aaadc1f..ae046acaa 100644 --- a/src/NATS.Client.ObjectStore/Internal/Encoder.cs +++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.Security.Cryptography; +using NATS.Client.Core; namespace NATS.Client.ObjectStore.Internal; @@ -68,19 +69,33 @@ public static string Encode(string arg) /// 'inArray' is null. /// offset or length is negative OR offset plus length is greater than the length of inArray. public static string Encode(Span inArray, bool raw = false) + { + using (var owner = EncodeToMemoryOwner(inArray, raw)) + { + var segment = owner.DangerousGetArray(); + if (segment.Array == null || segment.Array.Length == 0) + { + return String.Empty; + } + + return new string(segment.Array, segment.Offset, segment.Count); + } + } + + public static NatsMemoryOwner EncodeToMemoryOwner(Span inArray, bool raw = false) { var offset = 0; var length = inArray.Length; if (length == 0) - return string.Empty; + return NatsMemoryOwner.Empty; var lengthMod3 = length % 3; var limit = length - lengthMod3; - var output = new char[(length + 2) / 3 * 4]; + var owner = NatsMemoryOwner.Allocate((length + 2) / 3 * 4); var table = SBase64Table; int i, j = 0; - + var output = owner.Span; // takes 3 bytes from inArray and insert 4 bytes into output for (i = offset; i < limit; i += 3) { @@ -101,41 +116,41 @@ public static string Encode(Span inArray, bool raw = false) switch (lengthMod3) { case 2: - { - var d0 = inArray[i]; - var d1 = inArray[i + 1]; + { + var d0 = inArray[i]; + var d1 = inArray[i + 1]; - output[j + 0] = table[d0 >> 2]; - output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; - output[j + 2] = table[(d1 & 0x0f) << 2]; - j += 3; - } + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; + output[j + 2] = table[(d1 & 0x0f) << 2]; + j += 3; + } break; case 1: - { - var d0 = inArray[i]; + { + var d0 = inArray[i]; - output[j + 0] = table[d0 >> 2]; - output[j + 1] = table[(d0 & 0x03) << 4]; - j += 2; - } + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[(d0 & 0x03) << 4]; + j += 2; + } break; - // default or case 0: no further operations are needed. + // default or case 0: no further operations are needed. } if (raw) - return new string(output, 0, j); + return owner.Slice(0, j); for (var k = j; k < output.Length; k++) { output[k] = Base64PadCharacter; } - return new string(output); + return owner; } /// diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 0e5163c6d..9ec07d716 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -94,65 +94,75 @@ public async ValueTask GetAsync(string key, Stream stream, bool pushConsumer.Init(); - string digest; - var chunks = 0; - var size = 0; - using (var sha256 = SHA256.Create()) + var digest = NatsMemoryOwner.Empty; + try { + var chunks = 0; + var size = 0; + using (var sha256 = SHA256.Create()) + { #if NETSTANDARD2_0 - using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) + using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) #else await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen)) #endif - { - await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { - // We have to make sure to carry on consuming the channel to avoid any blocking: - // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally - // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level - // and signal the server that we are a slow consumer); then when we make an request-reply API call to - // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to - // keep consuming the channel to avoid this. - if (pushConsumer.IsDone) - continue; - - if (msg.Data.Length > 0) + await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) { - using var memoryOwner = msg.Data; - chunks++; - size += memoryOwner.Memory.Length; + // We have to make sure to carry on consuming the channel to avoid any blocking: + // e.g. if the channel is full, we would be blocking the reads off the socket (this was intentionally + // done ot avoid bloating the memory with a large backlog of messages or dropping messages at this level + // and signal the server that we are a slow consumer); then when we make an request-reply API call to + // delete the consumer, the socket would be blocked trying to send the response back to us; so we need to + // keep consuming the channel to avoid this. + if (pushConsumer.IsDone) + continue; + + if (msg.Data.Length > 0) + { + using var memoryOwner = msg.Data; + chunks++; + size += memoryOwner.Memory.Length; #if NETSTANDARD2_0 - var segment = memoryOwner.DangerousGetArray(); - await hashedStream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken); + var segment = memoryOwner.DangerousGetArray(); + await hashedStream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken); #else await hashedStream.WriteAsync(memoryOwner.Memory, cancellationToken); #endif - } + } - var p = msg.Metadata?.NumPending; - if (p is 0) - { - pushConsumer.Done(); + var p = msg.Metadata?.NumPending; + if (p is 0) + { + pushConsumer.Done(); + } } } + + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); } - digest = Base64UrlEncoder.Encode(sha256.Hash); - } + if (info.Digest == null + || info.Digest.StartsWith("SHA-256=") == false + || info.Digest.AsSpan().Slice("SHA-256=".Length).SequenceEqual(digest.Span) == false) + { + throw new NatsObjException("SHA-256 digest mismatch"); + } - if ($"SHA-256={digest}" != info.Digest) - { - throw new NatsObjException("SHA-256 digest mismatch"); - } - if (chunks != info.Chunks) - { - throw new NatsObjException("Chunks mismatch"); - } + if (chunks != info.Chunks) + { + throw new NatsObjException("Chunks mismatch"); + } - if (size != info.Size) + if (size != info.Size) + { + throw new NatsObjException("Size mismatch"); + } + } + finally { - throw new NatsObjException("Size mismatch"); + digest.Dispose(); } return info; @@ -223,117 +233,121 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre var chunks = 0; var chunkSize = meta.Options.MaxChunkSize.Value; - string digest; - using (var sha256 = SHA256.Create()) + var digest = NatsMemoryOwner.Empty; + try { + using (var sha256 = SHA256.Create()) + { #if NETSTANDARD2_0 - using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) + using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) #else await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen)) #endif - { - while (true) { - var memoryOwner = NatsMemoryOwner.Allocate(chunkSize); - - var memory = memoryOwner.Memory; - var currentChunkSize = 0; - var eof = false; - - // Fill a chunk while (true) { -#if NETSTANDARD2_0 - int read; - if (MemoryMarshal.TryGetArray((ReadOnlyMemory)memory, out var segment) == false) - { - read = await hashedStream.ReadAsync(segment.Array!, segment.Offset, segment.Count, cancellationToken); - } - else + var memoryOwner = NatsMemoryOwner.Allocate(chunkSize); + + var memory = memoryOwner.Memory; + var currentChunkSize = 0; + var eof = false; + + // Fill a chunk + while (true) { - var bytes = ArrayPool.Shared.Rent(memory.Length); - try +#if NETSTANDARD2_0 + int read; + if (MemoryMarshal.TryGetArray((ReadOnlyMemory)memory, out var segment) == false) { - segment = new ArraySegment(bytes, 0, memory.Length); read = await hashedStream.ReadAsync(segment.Array!, segment.Offset, segment.Count, cancellationToken); - segment.Array.AsMemory(0, read).CopyTo(memory); } - finally + else { - ArrayPool.Shared.Return(bytes); + var bytes = ArrayPool.Shared.Rent(memory.Length); + try + { + segment = new ArraySegment(bytes, 0, memory.Length); + read = await hashedStream.ReadAsync(segment.Array!, segment.Offset, segment.Count, cancellationToken); + segment.Array.AsMemory(0, read).CopyTo(memory); + } + finally + { + ArrayPool.Shared.Return(bytes); + } } - } #else var read = await hashedStream.ReadAsync(memory, cancellationToken); #endif - // End of stream - if (read == 0) - { - eof = true; - break; - } + // End of stream + if (read == 0) + { + eof = true; + break; + } - memory = memory.Slice(read); - currentChunkSize += read; + memory = memory.Slice(read); + currentChunkSize += read; - // Chunk filled - if (memory.IsEmpty) - { - break; + // Chunk filled + if (memory.IsEmpty) + { + break; + } } - } - if (currentChunkSize > 0) - { - size += currentChunkSize; - chunks++; - } + if (currentChunkSize > 0) + { + size += currentChunkSize; + chunks++; + } - var buffer = memoryOwner.Slice(0, currentChunkSize); + var buffer = memoryOwner.Slice(0, currentChunkSize); - // Chunks - var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); - ack.EnsureSuccess(); + // Chunks + var ack = await _context.PublishAsync(GetChunkSubject(nuid), buffer, serializer: NatsRawSerializer>.Default, cancellationToken: cancellationToken); + ack.EnsureSuccess(); - if (eof) - break; + if (eof) + break; + } } - } - if (sha256.Hash == null) - throw new NatsObjException("Can't compute SHA256 hash"); + if (sha256.Hash == null) + throw new NatsObjException("Can't compute SHA256 hash"); - digest = Base64UrlEncoder.Encode(sha256.Hash); - } + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); + } - meta.Chunks = chunks; - meta.Size = size; - meta.Digest = $"SHA-256={digest}"; + meta.Chunks = chunks; + meta.Size = size; + meta.Digest = $"SHA-256={digest}"; - // Metadata - await PublishMeta(meta, cancellationToken); + // Metadata + await PublishMeta(meta, cancellationToken); - // Delete the old object - if (info?.Nuid != null && info.Nuid != nuid) - { - try - { - await _context.JSRequestResponseAsync( - subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", - request: new StreamPurgeRequest - { - Filter = GetChunkSubject(info.Nuid), - }, - cancellationToken); - } - catch (NatsJSApiException e) + // Delete the old object + if (info?.Nuid != null && info.Nuid != nuid) { - if (e.Error.Code != 404) - throw; + try + { + await _context.JSRequestResponseAsync( + subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{Bucket}", + request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid), }, + cancellationToken); + } + catch (NatsJSApiException e) + { + if (e.Error.Code != 404) + throw; + } } } + finally + { + digest.Dispose(); + } return meta; } From 84afc5b0846f4cc97ffb2fda29e3b784354dafd0 Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Mon, 15 Jul 2024 11:28:05 -0400 Subject: [PATCH 2/4] ensure sha buffer is cleared, dotnet format --- .../Internal/Encoder.cs | 37 ++++++++++--------- src/NATS.Client.ObjectStore/NatsObjStore.cs | 17 ++++----- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs index ae046acaa..09d08305c 100644 --- a/src/NATS.Client.ObjectStore/Internal/Encoder.cs +++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs @@ -75,7 +75,7 @@ public static string Encode(Span inArray, bool raw = false) var segment = owner.DangerousGetArray(); if (segment.Array == null || segment.Array.Length == 0) { - return String.Empty; + return string.Empty; } return new string(segment.Array, segment.Offset, segment.Count); @@ -96,6 +96,7 @@ public static NatsMemoryOwner EncodeToMemoryOwner(Span inArray, bool var table = SBase64Table; int i, j = 0; var output = owner.Span; + // takes 3 bytes from inArray and insert 4 bytes into output for (i = offset; i < limit; i += 3) { @@ -116,30 +117,30 @@ public static NatsMemoryOwner EncodeToMemoryOwner(Span inArray, bool switch (lengthMod3) { case 2: - { - var d0 = inArray[i]; - var d1 = inArray[i + 1]; + { + var d0 = inArray[i]; + var d1 = inArray[i + 1]; - output[j + 0] = table[d0 >> 2]; - output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; - output[j + 2] = table[(d1 & 0x0f) << 2]; - j += 3; - } + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[((d0 & 0x03) << 4) | (d1 >> 4)]; + output[j + 2] = table[(d1 & 0x0f) << 2]; + j += 3; + } - break; + break; case 1: - { - var d0 = inArray[i]; + { + var d0 = inArray[i]; - output[j + 0] = table[d0 >> 2]; - output[j + 1] = table[(d0 & 0x03) << 4]; - j += 2; - } + output[j + 0] = table[d0 >> 2]; + output[j + 1] = table[(d0 & 0x03) << 4]; + j += 2; + } - break; + break; - // default or case 0: no further operations are needed. + // default or case 0: no further operations are needed. } if (raw) diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 9ec07d716..9309bd4ae 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -127,7 +127,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool var segment = memoryOwner.DangerousGetArray(); await hashedStream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken); #else - await hashedStream.WriteAsync(memoryOwner.Memory, cancellationToken); + await hashedStream.WriteAsync(memoryOwner.Memory, cancellationToken); #endif } @@ -139,7 +139,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool } } - digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); } if (info.Digest == null @@ -149,7 +149,6 @@ public async ValueTask GetAsync(string key, Stream stream, bool throw new NatsObjException("SHA-256 digest mismatch"); } - if (chunks != info.Chunks) { throw new NatsObjException("Chunks mismatch"); @@ -281,17 +280,17 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre #endif // End of stream - if (read == 0) + if (read == 0) { eof = true; break; } - memory = memory.Slice(read); - currentChunkSize += read; + memory = memory.Slice(read); + currentChunkSize += read; // Chunk filled - if (memory.IsEmpty) + if (memory.IsEmpty) { break; } @@ -314,10 +313,10 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre } } - if (sha256.Hash == null) + if (sha256.Hash == null) throw new NatsObjException("Can't compute SHA256 hash"); - digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); } meta.Chunks = chunks; From baea9e1c9aa426abd83de8ac0ca548b0fa299bda Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:01:10 -0400 Subject: [PATCH 3/4] fix what dotnet format didnt --- src/NATS.Client.ObjectStore/Internal/Encoder.cs | 4 ++-- src/NATS.Client.ObjectStore/NatsObjStore.cs | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/NATS.Client.ObjectStore/Internal/Encoder.cs b/src/NATS.Client.ObjectStore/Internal/Encoder.cs index 09d08305c..7fcb73dd7 100644 --- a/src/NATS.Client.ObjectStore/Internal/Encoder.cs +++ b/src/NATS.Client.ObjectStore/Internal/Encoder.cs @@ -127,7 +127,7 @@ public static NatsMemoryOwner EncodeToMemoryOwner(Span inArray, bool j += 3; } - break; + break; case 1: { @@ -138,7 +138,7 @@ public static NatsMemoryOwner EncodeToMemoryOwner(Span inArray, bool j += 2; } - break; + break; // default or case 0: no further operations are needed. } diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index 9309bd4ae..f4ef492bd 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -104,7 +104,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool #if NETSTANDARD2_0 using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write)) #else - await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen)) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Write, leaveOpen)) #endif { await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken)) @@ -139,7 +139,7 @@ public async ValueTask GetAsync(string key, Stream stream, bool } } - digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); } if (info.Digest == null @@ -280,17 +280,17 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre #endif // End of stream - if (read == 0) + if (read == 0) { eof = true; break; } - memory = memory.Slice(read); - currentChunkSize += read; + memory = memory.Slice(read); + currentChunkSize += read; // Chunk filled - if (memory.IsEmpty) + if (memory.IsEmpty) { break; } @@ -313,10 +313,10 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre } } - if (sha256.Hash == null) + if (sha256.Hash == null) throw new NatsObjException("Can't compute SHA256 hash"); - digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); + digest = Base64UrlEncoder.EncodeToMemoryOwner(sha256.Hash); } meta.Chunks = chunks; From 7ebd22e92807ec82aefc2b1d61d7450597eeb3cd Mon Sep 17 00:00:00 2001 From: to11mtm <12536917+to11mtm@users.noreply.github.com> Date: Tue, 16 Jul 2024 17:21:50 -0400 Subject: [PATCH 4/4] fix what dotnet format didnt take 2 --- src/NATS.Client.ObjectStore/NatsObjStore.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/NATS.Client.ObjectStore/NatsObjStore.cs b/src/NATS.Client.ObjectStore/NatsObjStore.cs index f4ef492bd..479c43718 100644 --- a/src/NATS.Client.ObjectStore/NatsObjStore.cs +++ b/src/NATS.Client.ObjectStore/NatsObjStore.cs @@ -240,7 +240,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre #if NETSTANDARD2_0 using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read)) #else - await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen)) + await using (var hashedStream = new CryptoStream(stream, sha256, CryptoStreamMode.Read, leaveOpen)) #endif { while (true) @@ -276,7 +276,7 @@ public async ValueTask PutAsync(ObjectMetadata meta, Stream stre } #else - var read = await hashedStream.ReadAsync(memory, cancellationToken); + var read = await hashedStream.ReadAsync(memory, cancellationToken); #endif // End of stream