Skip to content

Commit

Permalink
Merge branch 'main' into nats-client-js-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Sep 9, 2024
2 parents 5b32c3e + 323124a commit dc5668a
Show file tree
Hide file tree
Showing 25 changed files with 604 additions and 84 deletions.
10 changes: 2 additions & 8 deletions .github/workflows/perf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ jobs:
steps:
- name: Install nats
run: |
rel=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s https://api.github.com/repos/nats-io/natscli/releases/latest | jq -r .tag_name | sed s/v//)
rel=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/natscli/latest | sed s/v//)
wget https://github.com/nats-io/natscli/releases/download/v$rel/nats-$rel-linux-amd64.zip
unzip nats-$rel-linux-amd64.zip
sudo mv nats-$rel-linux-amd64/nats /usr/local/bin
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down
16 changes: 2 additions & 14 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@ jobs:
steps:
- name: Install nats-server
run: |
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down Expand Up @@ -119,13 +113,7 @@ jobs:
shell: bash
run: |
mkdir tools-nats-server && cd tools-nats-server
gh_api_url="https://api.github.com/repos/nats-io/nats-server/releases"
branch="${{ matrix.config.branch }}"
if [[ $branch == "v"* ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url | jq -r '.[].tag_name' | grep $branch | sort -V | tail -1)
elif [[ $branch == "latest" ]]; then
branch=$(curl -H 'Authorization: token ${{ secrets.AUTH_TOKEN_FOR_GITHUB_API }}' -s $gh_api_url/latest | jq -r .tag_name)
fi
branch=$(curl https://api.mtmk.dev/gh/v1/releases/tag/nats-io/nats-server/${{ matrix.config.branch }})
for i in 1 2 3
do
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@$branch | PREFIX=. sh && break || sleep 30
Expand Down
4 changes: 2 additions & 2 deletions sandbox/MicroBenchmark/NewInboxBenchmarks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ public class NewInboxBenchmarks
[GlobalSetup]
public void Setup()
{
NuidWriter.TryWriteNuid(new char[100]);
Nuid.TryWriteNuid(new char[100]);
}

[Benchmark(Baseline = true)]
[SkipLocalsInit]
public bool TryWriteNuid()
{
return NuidWriter.TryWriteNuid(_buf);
return Nuid.TryWriteNuid(_buf);
}

[Benchmark]
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void WriteConnect(IBufferWriter<byte> writer, ClientOpts opts)
BinaryPrimitives.WriteUInt64LittleEndian(span, ConnectSpace);
writer.Advance(ConnectSpaceLength);

var jsonWriter = new Utf8JsonWriter(writer);
using var jsonWriter = new Utf8JsonWriter(writer);
JsonSerializer.Serialize(jsonWriter, opts, JsonContext.Default.ClientOpts);

span = writer.GetSpan(UInt16Length);
Expand Down
7 changes: 7 additions & 0 deletions src/NATS.Client.Core/Internal/SslStreamConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ public void SignalDisconnected(Exception exception)
public async Task AuthenticateAsClientAsync(NatsUri uri, TimeSpan timeout)
{
var options = await _tlsOpts.AuthenticateAsClientOptionsAsync(uri).ConfigureAwait(true);

#if NETSTANDARD2_0
if (_sslStream != null)
_sslStream.Dispose();

_sslStream = new SslStream(
innerStream: new NetworkStream(_socket, true),
leaveInnerStreamOpen: false,
Expand All @@ -134,6 +138,9 @@ await _sslStream.AuthenticateAsClientAsync(
throw new NatsException($"TLS authentication failed", ex);
}
#else
if (_sslStream != null)
await _sslStream.DisposeAsync().ConfigureAwait(false);

_sslStream = new SslStream(innerStream: new NetworkStream(_socket, true));
try
{
Expand Down
14 changes: 12 additions & 2 deletions src/NATS.Client.Core/Internal/WebSocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ public Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
/// <summary>
/// Connect with Timeout. When failed, Dispose this connection.
/// </summary>
public async ValueTask ConnectAsync(Uri uri, TimeSpan timeout)
public async ValueTask ConnectAsync(Uri uri, NatsOpts opts)
{
using var cts = new CancellationTokenSource(timeout);
using var cts = new CancellationTokenSource(opts.ConnectTimeout);
try
{
await InvokeCallbackForClientWebSocketOptionsAsync(opts, uri, _socket.Options, cts.Token).ConfigureAwait(false);
await _socket.ConnectAsync(uri, cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
Expand Down Expand Up @@ -130,4 +131,13 @@ public void SignalDisconnected(Exception exception)
{
_waitForClosedSource.TrySetResult(exception);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private async Task InvokeCallbackForClientWebSocketOptionsAsync(NatsOpts opts, Uri uri, ClientWebSocketOptions options, CancellationToken token)
{
if (opts.ConfigureWebSocketOpts != null)
{
await opts.ConfigureWebSocketOpts(uri, options, token).ConfigureAwait(false);
}
}
}
3 changes: 2 additions & 1 deletion src/NATS.Client.Core/NKeyPair.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public byte[] Sign(byte[] src)
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
Expand Down Expand Up @@ -269,7 +270,7 @@ internal static string Encode(byte prefixbyte, bool seed, byte[] src)
if (src.Length != 32)
throw new NatsException("Invalid seed size");

var stream = new MemoryStream();
using var stream = new MemoryStream();

if (seed)
{
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.Core/NatsConnection.RequestReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ static void WriteBuffer(Span<char> buffer, (string prefix, int pfxLen) state)
state.prefix.AsSpan().CopyTo(buffer);
buffer[state.prefix.Length] = '.';
var remaining = buffer.Slice(state.pfxLen);
var didWrite = NuidWriter.TryWriteNuid(remaining);
var didWrite = Nuid.TryWriteNuid(remaining);
Debug.Assert(didWrite, "didWrite");
}

var separatorLength = prefix.Length > 0 ? 1 : 0;
var totalLength = prefix.Length + (int)NuidWriter.NuidLength + separatorLength;
var totalLength = prefix.Length + (int)Nuid.NuidLength + separatorLength;
var totalPrefixLength = prefix.Length + separatorLength;

#if NET6_0_OR_GREATER || NETSTANDARD2_1
Expand Down
6 changes: 3 additions & 3 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ private async ValueTask InitialConnectAsync()
if (uri.IsWebSocket)
{
var conn = new WebSocketConnection();
await conn.ConnectAsync(uri.Uri, Opts.ConnectTimeout).ConfigureAwait(false);
await conn.ConnectAsync(uri.Uri, Opts).ConfigureAwait(false);
_socket = conn;
}
else
Expand Down Expand Up @@ -609,7 +609,7 @@ private async void ReconnectLoop()
{
_logger.LogDebug(NatsLogEvents.Connection, "Trying to reconnect using WebSocket {Url} [{ReconnectCount}]", url, reconnectCount);
var conn = new WebSocketConnection();
await conn.ConnectAsync(url.Uri, Opts.ConnectTimeout).ConfigureAwait(false);
await conn.ConnectAsync(url.Uri, Opts).ConfigureAwait(false);
_socket = conn;
}
else
Expand Down Expand Up @@ -851,7 +851,7 @@ private async void StartPingTimer(CancellationToken cancellationToken)

_logger.LogDebug(NatsLogEvents.Connection, "Starting ping timer");

var periodicTimer = new PeriodicTimer(Opts.PingInterval);
using var periodicTimer = new PeriodicTimer(Opts.PingInterval);
ResetPongCount();
try
{
Expand Down
23 changes: 23 additions & 0 deletions src/NATS.Client.Core/NatsOpts.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Net.WebSockets;
using System.Text;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -114,6 +115,28 @@ public sealed record NatsOpts
/// </remarks>
public BoundedChannelFullMode SubPendingChannelFullMode { get; init; } = BoundedChannelFullMode.DropNewest;

/// <summary>
/// An optional async callback handler for manipulation of ClientWebSocketOptions used for WebSocket connections.
/// </summary>
/// <remarks>
/// This can be used to set authorization header and other HTTP header values.
/// Note: Setting HTTP header values is not supported by Blazor WebAssembly as the underlying browser implementation does not support adding headers to a WebSocket.
/// The callback's execution time contributes to the connection establishment subject to the <see cref="ConnectTimeout"/>.
/// Implementors should use the passed CancellationToken for async operations called by this handler.
/// </remarks>
/// <example>
/// await using var nats = new NatsConnection(new NatsOpts
/// {
/// Url = "ws://localhost:8080",
/// ConfigureWebSocketOpts = (serverUri, clientWsOpts, ct) =>
/// {
/// clientWsOpts.SetRequestHeader("authorization", $"Bearer MY_TOKEN");
/// return ValueTask.CompletedTask;
/// },
/// });
/// </example>
public Func<Uri, ClientWebSocketOptions, CancellationToken, ValueTask>? ConfigureWebSocketOpts { get; init; } = null;

internal NatsUri[] GetSeedUris()
{
var urls = Url.Split(',');
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/NatsSubBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ protected void ResetIdleTimeout()
if (_startUpTimeoutTimer != null)
{
_startUpTimeoutTimer.Change(dueTime: Timeout.InfiniteTimeSpan, period: Timeout.InfiniteTimeSpan);
_startUpTimeoutTimer = null;
_startUpTimeoutTimer.Dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@
using Random = NATS.Client.Core.Internal.NetStandardExtensions.Random;
#endif

namespace NATS.Client.Core.Internal;

namespace NATS.Client.Core;

/// <summary>
/// Represents a unique identifier generator.
/// </summary>
/// <remarks>
/// The <c>Nuid</c> class generates unique identifiers that can be used
/// to ensure uniqueness in distributed systems.
/// </remarks>
[SkipLocalsInit]
internal sealed class NuidWriter
public sealed class Nuid
{
// NuidLength, PrefixLength, SequentialLength were nuint (System.UIntPtr) in the original code
// however, they were changed to uint to fix the compilation error for IL2CPP Unity projects.
Expand All @@ -25,13 +32,13 @@ internal sealed class NuidWriter
private const int MaxIncrement = 333;

[ThreadStatic]
private static NuidWriter? _writer;
private static Nuid? _writer;

private char[] _prefix;
private ulong _increment;
private ulong _sequential;

private NuidWriter()
private Nuid()
{
Refresh(out _);
}
Expand All @@ -42,25 +49,30 @@ private NuidWriter()
private static ReadOnlySpan<char> Digits => "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
#endif

public static bool TryWriteNuid(Span<char> nuidBuffer)
/// <summary>
/// Generates a new NATS unique identifier (NUID).
/// </summary>
/// <returns>A new NUID as a string.</returns>
/// <exception cref="InvalidOperationException">Thrown when unable to generate the NUID.</exception>
public static string NewNuid()
{
if (_writer is not null)
Span<char> buffer = stackalloc char[(int)NuidLength];
if (TryWriteNuid(buffer))
{
return _writer.TryWriteNuidCore(nuidBuffer);
return buffer.ToString();
}

return InitAndWrite(nuidBuffer);
throw new InvalidOperationException("Internal error: can't generate nuid");
}

public static string NewNuid()
internal static bool TryWriteNuid(Span<char> nuidBuffer)
{
Span<char> buffer = stackalloc char[22];
if (TryWriteNuid(buffer))
if (_writer is not null)
{
return buffer.ToString();
return _writer.TryWriteNuidCore(nuidBuffer);
}

throw new InvalidOperationException("Internal error: can't generate nuid");
return InitAndWrite(nuidBuffer);
}

private static bool TryWriteNuidCore(Span<char> buffer, Span<char> prefix, ulong sequential)
Expand Down Expand Up @@ -140,7 +152,7 @@ private static char[] GetPrefix(RandomNumberGenerator? rng = null)
[MethodImpl(MethodImplOptions.NoInlining)]
private static bool InitAndWrite(Span<char> span)
{
_writer = new NuidWriter();
_writer = new Nuid();
return _writer.TryWriteNuidCore(span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace NATS.Client.JetStream.Internal;

internal class NatsJSNotificationChannel : IAsyncDisposable
internal sealed class NatsJSNotificationChannel : IAsyncDisposable
{
private readonly Func<INatsJSNotification, CancellationToken, Task> _notificationHandler;
private readonly Action<Exception> _exceptionHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ public async ValueTask DisposeAsync()
{
_nats.ConnectionDisconnected -= OnDisconnected;

#if NETSTANDARD2_0
_timer.Dispose();
#else
await _timer.DisposeAsync().ConfigureAwait(false);
#endif

// For correctly Dispose,
// first stop the consumer Creation operations and then the command execution operations.
// It is necessary that all consumerCreation operations have time to complete before command CommandLoop stop
Expand Down Expand Up @@ -390,7 +396,7 @@ await _context.CreateOrUpdateConsumerAsync(
private string NewNuid()
{
Span<char> buffer = stackalloc char[22];
if (NuidWriter.TryWriteNuid(buffer))
if (Nuid.TryWriteNuid(buffer))
{
return buffer.ToString();
}
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.JetStream/NatsJSContext.Consumers.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Runtime.CompilerServices;
using NATS.Client.Core.Internal;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;
Expand Down Expand Up @@ -228,7 +228,7 @@ internal ValueTask<ConsumerInfo> CreateOrderedConsumerInternalAsync(
request.Config.FilterSubjects = opts.FilterSubjects;
}

var name = NuidWriter.NewNuid();
var name = Nuid.NewNuid();
var subject = $"{Opts.Prefix}.CONSUMER.CREATE.{stream}.{name}";

return JSRequestResponseAsync<ConsumerCreateRequest, ConsumerInfo>(
Expand Down
10 changes: 8 additions & 2 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public NatsKVWatchCommandMsg()
public NatsJSMsg<T> Msg { get; init; } = default;
}

internal class NatsKVWatcher<T> : IAsyncDisposable
internal sealed class NatsKVWatcher<T> : IAsyncDisposable
{
private readonly ILogger _logger;
private readonly bool _debug;
Expand Down Expand Up @@ -144,6 +144,12 @@ public async ValueTask DisposeAsync()
await _sub.DisposeAsync();
}

#if NETSTANDARD2_0
_timer.Dispose();
#else
await _timer.DisposeAsync().ConfigureAwait(false);
#endif

_consumerCreateChannel.Writer.TryComplete();
_commandChannel.Writer.TryComplete();
_entryChannel.Writer.TryComplete();
Expand Down Expand Up @@ -444,7 +450,7 @@ private async ValueTask<INatsJSConsumer> CreatePushConsumer(string origin)
private string NewNuid()
{
Span<char> buffer = stackalloc char[22];
if (NuidWriter.TryWriteNuid(buffer))
if (Nuid.TryWriteNuid(buffer))
{
return buffer.ToString();
}
Expand Down
Loading

0 comments on commit dc5668a

Please sign in to comment.