Skip to content

Commit

Permalink
JetStream double ACK and NAK delay (#210)
Browse files Browse the repository at this point in the history
* Messages received by the consumer can be acknowledged optionally asking the
  server acknowledging the message acknowledgment.
* Messages can be 'rejected' (-NAK) with a delay.
  • Loading branch information
mtmk authored Nov 15, 2023
1 parent a463119 commit 1b2eb75
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/Internal/NatsJSConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace NATS.Client.JetStream.Internal;
internal static class NatsJSConstants
{
public static readonly ReadOnlySequence<byte> Ack = new(Encoding.ASCII.GetBytes("+ACK"));
public static readonly ReadOnlySequence<byte> Nack = new(Encoding.ASCII.GetBytes("-NAK"));
public static readonly ReadOnlySequence<byte> Nak = new(Encoding.ASCII.GetBytes("-NAK"));
public static readonly ReadOnlySequence<byte> AckProgress = new(Encoding.ASCII.GetBytes("+WPI"));
public static readonly ReadOnlySequence<byte> AckTerminate = new(Encoding.ASCII.GetBytes("+TERM"));
}
64 changes: 53 additions & 11 deletions src/NATS.Client.JetStream/NatsJSMsg.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Text;
using NATS.Client.Core;
using NATS.Client.JetStream.Internal;

Expand Down Expand Up @@ -57,6 +59,8 @@ internal NatsJSMsg(NatsMsg<T> msg, NatsJSContext context)
/// </summary>
public NatsJSMsgMetadata? Metadata => _replyToDateTimeAndSeq.Value;

private string? ReplyTo => _msg.ReplyTo;

/// <summary>
/// Reply with an empty message.
/// </summary>
Expand All @@ -79,13 +83,26 @@ public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = de
/// <summary>
/// Signals that the message will not be processed now and processing can move onto the next message.
/// </summary>
/// <param name="delay">Delay redelivery of the message.</param>
/// <param name="opts">Ack options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <returns>A <see cref="ValueTask"/> representing the async call.</returns>
/// <remarks>
/// Messages rejected using <c>NACK</c> will be resent by the NATS JetStream server after the configured timeout.
/// Messages rejected using <c>-NAK</c> will be resent by the NATS JetStream server after the configured timeout
/// or the delay parameter if it's specified.
/// </remarks>
public ValueTask NackAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.Nack, opts, cancellationToken);
public ValueTask NakAsync(AckOpts opts = default, TimeSpan delay = default, CancellationToken cancellationToken = default)
{
if (delay == default)
{
return SendAckAsync(NatsJSConstants.Nak, opts, cancellationToken);
}
else
{
var nakDelayed = new ReadOnlySequence<byte>(Encoding.ASCII.GetBytes($"-NAK {{\"delay\": {delay.ToNanos()}}}"));
return SendAckAsync(nakDelayed, opts, cancellationToken);
}
}

/// <summary>
/// Indicates that work is ongoing and the wait period should be extended.
Expand Down Expand Up @@ -113,23 +130,48 @@ public ValueTask ReplyAsync(NatsHeaders? headers = default, string? replyTo = de
/// <returns>A <see cref="ValueTask"/> representing the async call.</returns>
public ValueTask AckTerminateAsync(AckOpts opts = default, CancellationToken cancellationToken = default) => SendAckAsync(NatsJSConstants.AckTerminate, opts, cancellationToken);

private ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opts = default, CancellationToken cancellationToken = default)
private async ValueTask SendAckAsync(ReadOnlySequence<byte> payload, AckOpts opts = default, CancellationToken cancellationToken = default)
{
CheckPreconditions();

if (_msg == default)
throw new NatsJSException("No user message, can't acknowledge");

return _msg.ReplyAsync(
data: payload,
opts: new NatsPubOpts
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
cancellationToken: cancellationToken);
if ((opts.DoubleAck ?? _context.Opts.AckOpts.DoubleAck) == true)
{
await Connection.RequestAsync<ReadOnlySequence<byte>, object?>(ReplyTo, payload, cancellationToken: cancellationToken);
}
else
{
await _msg.ReplyAsync(
data: payload,
opts: new NatsPubOpts
{
WaitUntilSent = opts.WaitUntilSent ?? _context.Opts.AckOpts.WaitUntilSent,
},
cancellationToken: cancellationToken);
}
}

[MemberNotNull(nameof(Connection))]
[MemberNotNull(nameof(ReplyTo))]
private void CheckPreconditions()
{
if (Connection == default)
{
throw new NatsException("unable to send acknowledgment; message did not originate from a consumer");
}

if (string.IsNullOrWhiteSpace(ReplyTo))
{
throw new NatsException("unable to send acknowledgment; ReplyTo is empty");
}
}
}

/// <summary>
/// Options to be used when acknowledging messages received from a stream using a consumer.
/// </summary>
/// <param name="WaitUntilSent">Wait for the publish to be flushed down to the network.</param>
public readonly record struct AckOpts(bool? WaitUntilSent);
/// <param name="DoubleAck">Ask server for an acknowledgment.</param>
public readonly record struct AckOpts(bool? WaitUntilSent = false, bool? DoubleAck = false);
78 changes: 78 additions & 0 deletions tests/NATS.Client.JetStream.Tests/DoubleAckNakDelayTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using System.Text.RegularExpressions;
using NATS.Client.Core.Tests;

namespace NATS.Client.JetStream.Tests;

public class DoubleAckNakDelayTests
{
private readonly ITestOutputHelper _output;

public DoubleAckNakDelayTests(ITestOutputHelper output) => _output = output;

[Fact]
public async Task Double_ack_received_messages()
{
await using var server = NatsServer.StartJS();
var (nats1, proxy) = server.CreateProxiedClientConnection();
await using var nats = nats1;
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token);
ack.EnsureSuccess();
var next = await consumer.NextAsync<int>(cancellationToken: cts.Token);
if (next is { } msg)
{
await msg.AckAsync(new AckOpts(DoubleAck: true), cts.Token);
Assert.Equal(42, msg.Data);

await Retry.Until("seen ACK", () => proxy.Frames.Any(f => f.Message.StartsWith("PUB $JS.ACK")));

var ackFrame = proxy.Frames.Single(f => f.Message.StartsWith("PUB $JS.ACK"));
var inbox = Regex.Match(ackFrame.Message, @"\s(_INBOX\.\w+\.\w+)\s+\d").Groups[1].Value;

await Retry.Until("seen ACK-ACK", () => proxy.Frames.Any(f => f.Message.StartsWith($"MSG {inbox}")));
}
else
{
Assert.Fail("No message received");
}
}

[Fact]
public async Task Delay_nak_received_messages()
{
await using var server = NatsServer.StartJS();
var (nats1, proxy) = server.CreateProxiedClientConnection();
await using var nats = nats1;
var js = new NatsJSContext(nats);

var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token);
var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token);

var ack = await js.PublishAsync("s1.foo", 42, cancellationToken: cts.Token);
ack.EnsureSuccess();
var next = await consumer.NextAsync<int>(cancellationToken: cts.Token);
if (next is { } msg)
{
await msg.NakAsync(delay: TimeSpan.FromSeconds(123), cancellationToken: cts.Token);
Assert.Equal(42, msg.Data);

await Retry.Until("seen ACK", () => proxy.Frames.Any(f => f.Message.StartsWith("PUB $JS.ACK")));

var nakFrame = proxy.Frames.Single(f => f.Message.StartsWith("PUB $JS.ACK"));

Assert.Matches(@"-NAK\s+\{\s*""delay""\s*:\s*123000000000\s*\}", nakFrame.Message);
}
else
{
Assert.Fail("No message received");
}
}
}

0 comments on commit 1b2eb75

Please sign in to comment.