Skip to content

Commit

Permalink
Fixed warnings and format
Browse files Browse the repository at this point in the history
Done as separate commit to not clutter PRs.
  • Loading branch information
mtmk committed Nov 9, 2023
1 parent 6a026a5 commit 619764c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public async IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
}
}

CONSUME_LOOP:
CONSUME_LOOP:
if (protocolException != null)
{
if (protocolException
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Services/INatsSvcContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace NATS.Client.Services;
namespace NATS.Client.Services;

/// <summary>
/// NATS Services context.
Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.Core.MemoryTests/NatsConsumeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void Subscription_should_not_be_collected_when_in_consume_async_enumerato

var sub = Task.Run(async () =>
{
await js.CreateStreamAsync("s1", new [] { "s1.*" });
await js.CreateStreamAsync("s1", new[] { "s1.*" });

var consumer = await js.CreateOrderedConsumerAsync("s1");

Expand Down
2 changes: 1 addition & 1 deletion tests/NATS.Client.JetStream.Tests/ClusterTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using NATS.Client.Core.Tests;
using NATS.Client.Core.Tests;

namespace NATS.Client.JetStream.Tests;

Expand Down
110 changes: 55 additions & 55 deletions tests/NATS.Client.TestUtilities/NatsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ public class NatsServer : IAsyncDisposable
private static readonly string NatsServerPath = $"nats-server{Ext}";
private static readonly Version Version;

private CancellationTokenSource? _cancellationTokenSource;
private string? _configFileName;
private readonly string? _jetStreamStoreDir;
private readonly ITestOutputHelper _outputHelper;
private Task<string[]> _processOut;
private Task<string[]> _processErr;
private readonly TransportType _transportType;
private readonly OutputHelperLoggerFactory _loggerFactory;
private CancellationTokenSource? _cancellationTokenSource;
private Task<string[]>? _processOut;
private Task<string[]>? _processErr;
private string? _configFileName;
private int _disposed;

public Process ServerProcess { get; private set; }

static NatsServer()
{
var process = new Process
Expand Down Expand Up @@ -72,51 +70,7 @@ private NatsServer(ITestOutputHelper outputHelper, NatsServerOpts opts)
}
}

public void StartServerProcess()
{
_cancellationTokenSource = new CancellationTokenSource();

(_configFileName, var config, var cmd) = GetCmd(Opts);

_outputHelper.WriteLine("ProcessStart: " + cmd + Environment.NewLine + config);
var (p, stdout, stderr) = ProcessX.GetDualAsyncEnumerable(cmd);
ServerProcess = p;
_processOut = EnumerateWithLogsAsync(stdout, _cancellationTokenSource.Token);
_processErr = EnumerateWithLogsAsync(stderr, _cancellationTokenSource.Token);

// Check for start server
Task.Run(async () =>
{
using var client = new TcpClient();
while (!_cancellationTokenSource.IsCancellationRequested)
{
try
{
await client.ConnectAsync("127.0.0.1", Opts.ServerPort, _cancellationTokenSource.Token);
if (client.Connected)
return;
}
catch
{
// ignore
}

await Task.Delay(500, _cancellationTokenSource.Token);
}
}).Wait(5000); // timeout

if (_processOut.IsFaulted)
{
_processOut.GetAwaiter().GetResult(); // throw exception
}

if (_processErr.IsFaulted)
{
_processErr.GetAwaiter().GetResult(); // throw exception
}

_outputHelper.WriteLine("OK to Process Start, Port:" + Opts.ServerPort);
}
public Process? ServerProcess { get; private set; }

public NatsServerOpts Opts { get; }

Expand Down Expand Up @@ -205,14 +159,60 @@ public static NatsServer Start(ITestOutputHelper outputHelper, NatsServerOpts op
throw new Exception("Can't start nats-server and connect to it");
}

public void StartServerProcess()
{
_cancellationTokenSource = new CancellationTokenSource();

(_configFileName, var config, var cmd) = GetCmd(Opts);

_outputHelper.WriteLine("ProcessStart: " + cmd + Environment.NewLine + config);
var (p, stdout, stderr) = ProcessX.GetDualAsyncEnumerable(cmd);
ServerProcess = p;
_processOut = EnumerateWithLogsAsync(stdout, _cancellationTokenSource.Token);
_processErr = EnumerateWithLogsAsync(stderr, _cancellationTokenSource.Token);

// Check for start server
Task.Run(async () =>
{
using var client = new TcpClient();
while (!_cancellationTokenSource.IsCancellationRequested)
{
try
{
await client.ConnectAsync("127.0.0.1", Opts.ServerPort, _cancellationTokenSource.Token);
if (client.Connected)
return;
}
catch
{
// ignore
}

await Task.Delay(500, _cancellationTokenSource.Token);
}
}).Wait(5000); // timeout

if (_processOut.IsFaulted)
{
_processOut.GetAwaiter().GetResult(); // throw exception
}

if (_processErr.IsFaulted)
{
_processErr.GetAwaiter().GetResult(); // throw exception
}

_outputHelper.WriteLine("OK to Process Start, Port:" + Opts.ServerPort);
}

public async ValueTask RestartAsync()
{
var t1 = ServerProcess.StartTime;
var t1 = ServerProcess?.StartTime;

await StopAsync();
StartServerProcess();

var t2 = ServerProcess.StartTime;
var t2 = ServerProcess?.StartTime;

if (t1 == t2)
throw new Exception("Can't restart nats-server");
Expand All @@ -225,7 +225,7 @@ public async ValueTask StopAsync()
_cancellationTokenSource?.Cancel(); // trigger of process kill.
_cancellationTokenSource?.Dispose();

var processLogs = await _processErr; // wait for process exit, nats output info to stderror
var processLogs = await _processErr!; // wait for process exit, nats output info to stderror
if (processLogs.Length != 0)
{
_outputHelper.WriteLine("Process Logs of " + Opts.ServerPort);
Expand Down Expand Up @@ -255,7 +255,7 @@ public async ValueTask DisposeAsync()
_cancellationTokenSource?.Cancel(); // trigger of process kill.
_cancellationTokenSource?.Dispose();

var processLogs = await _processErr; // wait for process exit, nats output info to stderror
var processLogs = await _processErr!; // wait for process exit, nats output info to stderror
if (processLogs.Length != 0)
{
_outputHelper.WriteLine("Process Logs of " + Opts.ServerPort);
Expand Down

0 comments on commit 619764c

Please sign in to comment.