Skip to content

Commit

Permalink
Test matrix for NATS Server versions (nats-io#84)
Browse files Browse the repository at this point in the history
* Test matrix for NATS Server versions

* Test's PATH issue of dotnet on Linux

* Fixed unsubscribe dispose order

* Cleared warnings
  • Loading branch information
mtmk authored Jul 12, 2023
1 parent c04afd7 commit 0bafc45
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 66 deletions.
28 changes: 21 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ on:
jobs:
dotnet:
name: dotnet
strategy:
fail-fast: false
matrix:
config:
- branch: dev
- branch: main
runs-on: ubuntu-latest
env:
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand All @@ -17,10 +23,11 @@ jobs:
steps:
- name: Install nats-server
run: |
docker run --rm --entrypoint cat nats:alpine /usr/local/bin/nats-server \
| sudo tee /usr/local/bin/nats-server >/dev/null
sudo chmod +x /usr/local/bin/nats-server
nats-server -v
curl -sf https://binaries.nats.dev/nats-io/nats-server/v2@${{ matrix.config.branch }} | PREFIX=. sh
sudo mv nats-server /usr/local/bin
- name: Check nats-server
run: nats-server -v

- name: Checkout
uses: actions/checkout@v3
Expand All @@ -38,6 +45,12 @@ jobs:

memory_test:
name: memory test
strategy:
fail-fast: false
matrix:
config:
- branch: dev
- branch: main
runs-on: windows-latest
env:
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand All @@ -56,10 +69,11 @@ jobs:

- name: Get nats-server
run: |
Invoke-WebRequest https://github.com/nats-io/nats-server/releases/download/v2.9.17/nats-server-v2.9.17-windows-amd64.zip -OutFile tools-nats-server.zip
Expand-Archive tools-nats-server.zip
mkdir tools-nats-server
cd tools-nats-server
curl -Ls "https://binaries.nats.dev/binary/github.com/nats-io/nats-server/v2?os=windows&arch=amd64&version=${{ matrix.config.branch }}" -o nats-server.exe
$current_path = (Get-Item .).FullName
echo "$current_path\tools-nats-server\nats-server-v2.9.17-windows-amd64" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
echo "$current_path" | Out-File -FilePath $env:GITHUB_PATH -Encoding utf8 -Append
- name: Check nats-server
run: nats-server -v
Expand Down
2 changes: 1 addition & 1 deletion sandbox/NatsBenchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ public struct Vector3

internal static class NatsMsgTestUtils
{
internal static NatsSub<T>? Register<T>(this NatsSub<T>? sub, Action<NatsMsg<T>> action)
internal static NatsSub<T>? Register<T>(this NatsSub<T>? sub, Action<NatsMsg<T?>> action)
{
if (sub == null)
return null;
Expand Down
26 changes: 2 additions & 24 deletions src/NATS.Client.Core/Internal/InboxSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,12 @@ public ValueTask ReceiveAsync(string subject, string? replyTo, ReadOnlySequence<
public ValueTask DisposeAsync() => _manager.RemoveAsync(this);
}

internal class InboxSubBuilder : INatsSubBuilder<InboxSub>, ISubscriptionManager, IAsyncDisposable
internal class InboxSubBuilder : INatsSubBuilder<InboxSub>, ISubscriptionManager
{
private readonly ILogger<InboxSubBuilder> _logger;
private readonly string? _queueGroup;
private readonly ConcurrentDictionary<string, INatsSub> _writers = new();
private readonly string _prefix;
private InboxSub? _sub;
private bool _started;

public InboxSubBuilder(
NatsConnection connection,
string prefix,
string? queueGroup)
{
_logger = connection.Options.LoggerFactory.CreateLogger<InboxSubBuilder>();
_prefix = prefix;
_queueGroup = queueGroup;
Connection = connection;
}

private NatsConnection Connection { get; }
public InboxSubBuilder(ILogger<InboxSubBuilder> logger) => _logger = logger;

public InboxSub Build(string subject, NatsSubOpts? opts, NatsConnection connection, ISubscriptionManager manager, CancellationToken cancellationToken)
{
Expand All @@ -90,13 +75,6 @@ public ValueTask ReceivedAsync(string subject, string? replyTo, in ReadOnlySeque
return sub.ReceiveAsync(subject, replyTo, headersBuffer, payloadBuffer);
}

public ValueTask DisposeAsync()
{
if (_sub != null)
return _sub.DisposeAsync();
return ValueTask.CompletedTask;
}

public ValueTask RemoveAsync(INatsSub sub)
{
Unregister(sub.Subject);
Expand Down
52 changes: 26 additions & 26 deletions src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SubscriptionManager(NatsConnection connection, string inboxPrefix)
_cts = new CancellationTokenSource();
_cleanupInterval = _connection.Options.SubscriptionCleanUpInterval;
_timer = Task.Run(CleanupAsync, _cts.Token);
_inboxSubBuilder = new InboxSubBuilder(connection, inboxPrefix, queueGroup: default);
_inboxSubBuilder = new InboxSubBuilder(connection.Options.LoggerFactory.CreateLogger<InboxSubBuilder>());
_inboxSubSentinel = new InboxSub(_inboxSubBuilder, nameof(_inboxSubSentinel), default, connection, this);
_inboxSub = _inboxSubSentinel;
}
Expand Down Expand Up @@ -93,31 +93,6 @@ public async ValueTask<T> SubscribeAsync<T>(string subject, NatsSubOpts? opts, I
}
}

private async ValueTask<T> SubscribeInternalAsync<T>(string subject, NatsSubOpts? opts, INatsSubBuilder<T> builder, CancellationToken cancellationToken)
where T : INatsSub
{
var sub = builder.Build(subject, opts, connection: _connection, this, cancellationToken);
var sid = GetNextSid();
lock (_gate)
{
_bySid[sid] = new WeakReference<INatsSub>(sub);
_bySub.AddOrUpdate(sub, new SubscriptionMetadata(Sid: sid));
}

try
{
await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken)
.ConfigureAwait(false);
sub.Ready();
return sub;
}
catch
{
await sub.DisposeAsync().ConfigureAwait(false);
throw;
}
}

public ValueTask PublishToClientHandlersAsync(string subject, string? replyTo, int sid, in ReadOnlySequence<byte>? headersBuffer, in ReadOnlySequence<byte> payloadBuffer)
{
int? orphanSid = null;
Expand Down Expand Up @@ -190,6 +165,31 @@ public ValueTask RemoveAsync(INatsSub sub)
return _connection.UnsubscribeAsync(subMetadata.Sid);
}

private async ValueTask<T> SubscribeInternalAsync<T>(string subject, NatsSubOpts? opts, INatsSubBuilder<T> builder, CancellationToken cancellationToken)
where T : INatsSub
{
var sub = builder.Build(subject, opts, connection: _connection, this, cancellationToken);
var sid = GetNextSid();
lock (_gate)
{
_bySid[sid] = new WeakReference<INatsSub>(sub);
_bySub.AddOrUpdate(sub, new SubscriptionMetadata(Sid: sid));
}

try
{
await _connection.SubscribeCoreAsync(sid, subject, opts?.QueueGroup, opts?.MaxMsgs, cancellationToken)
.ConfigureAwait(false);
sub.Ready();
return sub;
}
catch
{
await sub.DisposeAsync().ConfigureAwait(false);
throw;
}
}

private int GetNextSid() => Interlocked.Increment(ref _sid);

private async Task CleanupAsync()
Expand Down
4 changes: 1 addition & 3 deletions src/NATS.Client.Core/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public partial class NatsConnection : IAsyncDisposable, INatsConnection
private ClientOptions _clientOptions;
private UserCredentials? _userCredentials;

internal string InboxPrefix { get; }

public NatsConnection()
: this(NatsOptions.Default)
{
Expand Down Expand Up @@ -92,7 +90,7 @@ public NatsConnection(NatsOptions options)

public HeaderParser HeaderParser { get; }

internal ObjectPool ObjectPool => _pool;
internal string InboxPrefix { get; }

/// <summary>
/// Connect socket and write CONNECT command to nats server.
Expand Down
6 changes: 4 additions & 2 deletions src/NATS.Client.Core/NatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ internal NatsSubBase(
_cts.Token.UnsafeRegister(
static (self, _) =>
{
((NatsSubBase) self!).EndSubscription(NatsSubEndReason.Cancelled);
((NatsSubBase)self!).EndSubscription(NatsSubEndReason.Cancelled);
},
this);
}
Expand Down Expand Up @@ -150,13 +150,15 @@ public ValueTask DisposeAsync()

GC.SuppressFinalize(this);

var unsubscribeAsync = UnsubscribeAsync();

_timeoutTimer?.Dispose();
_idleTimeoutTimer?.Dispose();
_startUpTimeoutTimer?.Dispose();

_cts?.Dispose();

return UnsubscribeAsync();
return unsubscribeAsync;
}

ValueTask INatsSub.ReceiveAsync(
Expand Down
8 changes: 7 additions & 1 deletion tests/NATS.Client.Core.Tests/SubscriptionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ await Retry.Until(

await Retry.Until(
"unsubscribe message received",
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1);
() => proxy.ClientFrames.Count(f => f.Message.StartsWith("UNSUB")) == 1,
() =>
{
GC.Collect();
return Task.CompletedTask;
},
retryDelay: TimeSpan.FromSeconds(.5));
}

[Fact]
Expand Down
4 changes: 2 additions & 2 deletions tests/NATS.Client.Core.Tests/_Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public static void WaitForTcpPortToClose(int port)

internal static class NatsMsgTestUtils
{
internal static Task Register<T>(this NatsSub<T>? sub, Action<NatsMsg<T>> action)
internal static Task Register<T>(this NatsSub<T>? sub, Action<NatsMsg<T?>> action)
{
if (sub == null)
return Task.CompletedTask;
Expand All @@ -59,7 +59,7 @@ internal static Task Register<T>(this NatsSub<T>? sub, Action<NatsMsg<T>> action
});
}

internal static Task Register<T>(this NatsSub<T>? sub, Func<NatsMsg<T>, Task> action)
internal static Task Register<T>(this NatsSub<T>? sub, Func<NatsMsg<T?>, Task> action)
{
if (sub == null)
return Task.CompletedTask;
Expand Down

0 comments on commit 0bafc45

Please sign in to comment.