Skip to content

Commit

Permalink
Add option to rollback session (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
Havret authored Jun 10, 2024
1 parent 879d556 commit 5eb9cd1
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 2 deletions.
20 changes: 20 additions & 0 deletions src/ArtemisNetCoreClient/Framing/Outgoing/RollbackMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using ActiveMQ.Artemis.Core.Client.InternalUtilities;

namespace ActiveMQ.Artemis.Core.Client.Framing.Outgoing;

internal readonly struct RollbackMessage : IOutgoingPacket
{
public PacketType PacketType => PacketType.SessionRollbackMessage;

public required bool ConsiderLastMessageAsDelivered { get; init; }

public int GetRequiredBufferSize()
{
return sizeof(bool);
}

public int Encode(Span<byte> buffer)
{
return ArtemisBinaryConverter.WriteBool(ref buffer.GetReference(), ConsiderLastMessageAsDelivered);
}
}
2 changes: 1 addition & 1 deletion src/ArtemisNetCoreClient/Framing/PacketType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ internal enum PacketType : sbyte
SessionCreateConsumerMessage = 40,
SessionAcknowledgeMessage = 41,
SessionCommitMessage = 43,
SessionRollbackMessage = 44,
SessionQueueQueryMessage = 45,
SessionBindingQueryMessage = 49,
SessionStart = 67,
Expand All @@ -26,5 +27,4 @@ internal enum PacketType : sbyte
CreateProducerMessage = -20,
RemoveProducerMessage = -21,
SessionBindingQueryResponseMessage = -22,

}
5 changes: 5 additions & 0 deletions src/ArtemisNetCoreClient/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public interface ISession : IAsyncDisposable
/// Commit the current transaction (any pending sends and acks).
/// </summary>
Task CommitAsync(CancellationToken cancellationToken);

/// <summary>
/// Rollback the current transaction (any pending sends and acks).
/// </summary>
Task RollbackAsync(CancellationToken cancellationToken);
}
26 changes: 26 additions & 0 deletions src/ArtemisNetCoreClient/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,32 @@ public async Task CommitAsync(CancellationToken cancellationToken)
}
}

public async Task RollbackAsync(CancellationToken cancellationToken)
{
var request = new RollbackMessage
{
ConsiderLastMessageAsDelivered = false
};

await _lock.WaitAsync(cancellationToken);
try
{
var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
_ = _completionSources.TryAdd(-1, tcs);
connection.Send(request, ChannelId);
await tcs.Task;
}
catch (Exception)
{
_completionSources.TryRemove(-1, out _);
throw;
}
finally
{
_lock.Release();
}
}

internal ValueTask RemoveProducerAsync(int producerId)
{
var request = new RemoveProducerMessage
Expand Down
53 changes: 52 additions & 1 deletion test/ArtemisNetCoreClient.Tests/SessionSpec.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using ActiveMQ.Artemis.Core.Client.Exceptions;
using ActiveMQ.Artemis.Core.Client.Tests.Utils;
using NScenario;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -183,7 +184,6 @@ public async Task Should_create_multiple_producers_using_the_same_session()
Address = addressName
}, testFixture.CancellationToken);
}


[Fact]
public async Task Should_delete_queue()
Expand Down Expand Up @@ -225,4 +225,55 @@ public async Task Should_not_delete_queue_when_it_does_not_exist()
var exception = await Assert.ThrowsAsync<ActiveMQNonExistentQueueException>(() => session.DeleteQueueAsync(queueName, testFixture.CancellationToken));
Assert.Equal(ActiveMQExceptionType.QueueDoesNotExist, exception.Type);
}

[Fact]
public async Task Should_rollback_pending_sends()
{
await using var testFixture = await TestFixture.CreateAsync(testOutputHelper);
var scenario = TestScenarioFactory.Default(new XUnitOutputAdapter(testOutputHelper));

await using var connection = await testFixture.CreateConnectionAsync();
await using var session = await connection.CreateSessionAsync(new SessionConfiguration
{
AutoCommitSends = false
}, testFixture.CancellationToken);

var (addressName, queueName) = await scenario.Step("Create address and queue", async () =>
{
var addressName = await testFixture.CreateAddressAsync(RoutingType.Anycast);
var queueName = await testFixture.CreateQueueAsync(addressName, RoutingType.Anycast);
return (addressName, queueName);
});

await using var producer = await scenario.Step("Create producer", async () =>
{
return await session.CreateProducerAsync(new ProducerConfiguration
{
Address = addressName,
RoutingType = RoutingType.Anycast
}, testFixture.CancellationToken);
});

await scenario.Step("Send message", async () =>
{
await producer.SendMessageAsync(new Message
{
Body = "test_payload"u8.ToArray()
}, testFixture.CancellationToken);
});

await scenario.Step("Rollback transaction", async () =>
{
await session.RollbackAsync(testFixture.CancellationToken);
});

await scenario.Step("Confirm that the queue is empty", async () =>
{
var queueInfo = await session.GetQueueInfoAsync(queueName, testFixture.CancellationToken);
Assert.NotNull(queueInfo);
Assert.Equal(0, queueInfo.MessageCount);
});
}

// TODO: Add test for Rollback with pending acks
}

0 comments on commit 5eb9cd1

Please sign in to comment.