Skip to content

Commit

Permalink
return ERROR token violation
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Apr 13, 2022
1 parent c3cb8ee commit f7d458e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 9 deletions.
4 changes: 2 additions & 2 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public static async Task Main()
.AddLogging(x =>
{
x.ClearProviders();
x.SetMinimumLevel(LogLevel.Information);
x.SetMinimumLevel(LogLevel.Trace);
x.AddZLoggerConsole();
})
.BuildServiceProvider();
Expand All @@ -40,7 +40,7 @@ public static async Task Main()
Serializer = new MessagePackNatsSerializer(),
ConnectTimeout = TimeSpan.FromSeconds(1),
ConnectOptions = ConnectOptions.Default with { Echo = true, Verbose = false },
PingInterval = TimeSpan.FromSeconds(5)
PingInterval = TimeSpan.Zero
};


Expand Down
21 changes: 15 additions & 6 deletions src/AlterNats/NatsConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,25 @@ async ValueTask InitialConnectAsync()

// Connected completely but still ConnnectionState is Connecting(require after receive INFO).

// add CONNECT command to priority lane
// add CONNECT and PING command to priority lane
var connectCommand = AsyncConnectCommand.Create(pool, Options.ConnectOptions);
writerState.PriorityCommands.Add(connectCommand);
writerState.PriorityCommands.Add(PingCommand.Create(pool));

// Run Reader/Writer LOOP start
var waitForInfoSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var waitForPongOrErrorSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
this.socketWriter = new NatsPipeliningWriteProtocolProcessor(socket, writerState, pool);
this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal);
this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal, waitForPongOrErrorSignal);

try
{
await connectCommand.AsValueTask().ConfigureAwait(false);
// before send connect, wait INFO.
await waitForInfoSignal.Task.ConfigureAwait(false);
// send COMMAND and PING
await connectCommand.AsValueTask().ConfigureAwait(false);
// receive COMMAND response(PONG or ERROR)
await waitForPongOrErrorSignal.Task.ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -287,22 +293,25 @@ async void ReconnectLoopAsync()
goto CONNECT_AGAIN;
}

// add CONNECT command to priority lane
// add CONNECT and PING command to priority lane
var connectCommand = AsyncConnectCommand.Create(pool, Options.ConnectOptions);
writerState.PriorityCommands.Add(connectCommand);
writerState.PriorityCommands.Add(PingCommand.Create(pool));

// Add SUBSCRIBE command to priority lane
var subscribeCommand = AsyncSubscribeBatchCommand.Create(pool, subscriptionManager.GetExistingSubscriptions());
writerState.PriorityCommands.Add(subscribeCommand);

// Run Reader/Writer LOOP start
var waitForInfoSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
var waitForPongOrErrorSignal = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
this.socketWriter = new NatsPipeliningWriteProtocolProcessor(socket, writerState, pool);
this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal);
this.socketReader = new NatsReadProtocolProcessor(socket, this, waitForInfoSignal, waitForPongOrErrorSignal);

await waitForInfoSignal.Task.ConfigureAwait(false);
await connectCommand.AsValueTask().ConfigureAwait(false);
await waitForPongOrErrorSignal.Task.ConfigureAwait(false);
await subscribeCommand.AsValueTask().ConfigureAwait(false);
await waitForInfoSignal.Task.ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
8 changes: 7 additions & 1 deletion src/AlterNats/NatsReadProtocolProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ internal sealed class NatsReadProtocolProcessor : IAsyncDisposable
readonly SocketReader socketReader;
readonly Task readLoop;
readonly TaskCompletionSource waitForInfoSignal;
readonly TaskCompletionSource waitForPongOrErrorSignal; // wait for initial connection
readonly ConcurrentQueue<AsyncPingCommand> pingCommands; // wait for pong
readonly ILogger<NatsReadProtocolProcessor> logger;
readonly bool isEnabledTraceLogging;
int disposed;

public NatsReadProtocolProcessor(TcpConnection socket, NatsConnection connection, TaskCompletionSource waitForInfoSignal)
public NatsReadProtocolProcessor(TcpConnection socket, NatsConnection connection, TaskCompletionSource waitForInfoSignal, TaskCompletionSource waitForPongOrErrorSignal)
{
this.socket = socket;
this.connection = connection;
this.logger = connection.Options.LoggerFactory.CreateLogger<NatsReadProtocolProcessor>();
this.isEnabledTraceLogging = logger.IsEnabled(LogLevel.Trace);
this.waitForInfoSignal = waitForInfoSignal;
this.waitForPongOrErrorSignal = waitForPongOrErrorSignal;
this.pingCommands = new ConcurrentQueue<AsyncPingCommand>();
this.socketReader = new SocketReader(socket, connection.Options.ReaderBufferSize, connection.Options.LoggerFactory);
this.readLoop = Task.Run(ReadLoopAsync);
Expand Down Expand Up @@ -242,6 +244,7 @@ async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, in ReadOn
const int PongSize = 6; // PONG\r\n

connection.ResetPongCount(); // reset count for PingTimer
waitForPongOrErrorSignal.TrySetResult(); // set for initial connect

if (pingCommands.TryDequeue(out var pingCommand))
{
Expand Down Expand Up @@ -270,12 +273,14 @@ async ValueTask<ReadOnlySequence<byte>> DispatchCommandAsync(int code, in ReadOn
var newPosition = newBuffer.PositionOf((byte)'\n');
var error = ParseError(newBuffer.Slice(0, buffer.GetOffset(newPosition!.Value) - 1));
logger.LogError(error);
waitForPongOrErrorSignal.TrySetException(new NatsException(error));
return newBuffer.Slice(newBuffer.GetPosition(1, newPosition!.Value));
}
else
{
var error = ParseError(buffer.Slice(0, buffer.GetOffset(position.Value) - 1));
logger.LogError(error);
waitForPongOrErrorSignal.TrySetException(new NatsException(error));
return buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
Expand Down Expand Up @@ -424,6 +429,7 @@ public async ValueTask DisposeAsync()
item.SetCanceled(CancellationToken.None);
}
waitForInfoSignal.TrySetCanceled();
waitForPongOrErrorSignal.TrySetCanceled();
}
}

Expand Down
4 changes: 4 additions & 0 deletions tools/start-nats-server-withtoken.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
@REM https://docs.nats.io/running-a-nats-service/introduction/flags
@REM -DV is Debug and Protocol Trace
@REM -DVV is Debug and Verbose
nats-server.exe -DV --auth s3cr3t

0 comments on commit f7d458e

Please sign in to comment.