From 15834b6f86740b1019b785c06ce74e774a89e121 Mon Sep 17 00:00:00 2001 From: Richard Pringle Date: Tue, 31 Dec 2024 22:50:47 +0800 Subject: [PATCH] #356 Abandon/dead letter message Signed-off-by: Richard Pringle --- docs/intro.md | 48 +++++++++- docs/intro.t.md | 48 +++++++++- .../Consumer/ISqsConsumerErrorHandler.cs | 2 +- .../Consumer/SqsBaseConsumer.cs | 5 + .../Consumer/EhPartitionConsumer.cs | 5 + .../Consumer/IEventHubConsumerErrorHandler.cs | 2 +- .../Consumer/AsbBaseConsumer.cs | 54 +++++++---- .../IServiceBusConsumerErrorHandler.cs | 2 +- .../Consumer/IKafkaConsumerErrorHandler.cs | 2 +- .../Consumer/KafkaPartitionConsumer.cs | 5 + .../Consumers/IMemoryConsumerErrorHandler.cs | 2 +- .../MemoryMessageBus.cs | 5 + .../IMqttConsumerErrorHandler.cs | 2 +- .../INatsConsumerErrorHandler.cs | 2 +- .../RabbitMqConsumerBuilderExtensions.cs | 16 +++- .../Config/RabbitMqHasProviderExtensions.cs | 2 +- .../RabbitMqMessageBusBuilderExtensions.cs | 4 +- .../Config/RabbitMqProperties.cs | 4 +- .../IRabbitMqConsumerErrorHandler.cs | 2 +- ...RabbitMqAutoAcknowledgeMessageProcessor.cs | 50 +++++++--- .../Consumers/RabbitMqConsumer.cs | 4 +- .../Consumers/RabbitMqResponseConsumer.cs | 24 +++-- ...itMqMessageBusSettingsValidationService.cs | 3 +- .../Consumers/IRedisConsumerErrorHandler.cs | 2 +- .../Consumers/RedisListCheckerConsumer.cs | 5 + .../Consumers/RedisTopicConsumer.cs | 5 + .../ErrorHandling/ConsumerErrorHandler.cs | 7 +- .../ConsumerErrorHandlerResult.cs | 27 +++--- .../ConcurrentMessageProcessorDecorator.cs | 8 +- .../MessageProcessors/IMessageHandler.cs | 2 +- .../MessageProcessors/IMessageProcessor.cs | 3 +- .../MessageProcessors/MessageHandler.cs | 18 ++-- .../MessageProcessors/MessageProcessor.cs | 9 +- .../MessageProcessors/ProcessResult.cs | 9 ++ .../ResponseMessageProcessor.cs | 9 +- .../ServiceBusMessageBusIt.cs | 91 +++++++++++++++++++ .../ConcurrentMessageProcessorQueueTests.cs | 2 +- .../Consumers/MessageProcessorQueueTests.cs | 2 +- ...tMqAutoAcknowledgeMessageProcessorTests.cs | 37 +++++--- .../IntegrationTest/BaseIntegrationTest.cs | 13 +++ ...ConcurrentMessageProcessorDecoratorTest.cs | 6 +- 41 files changed, 428 insertions(+), 120 deletions(-) create mode 100644 src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs diff --git a/docs/intro.md b/docs/intro.md index 0746dfc1..984d8886 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -1089,10 +1089,11 @@ The returned `ConsumerErrorHandlerResult` object is used to override the executi | Result | Description | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** | | Failure | The message failed to be processed and should be returned to the queue | | Success | The pipeline must treat the message as having been processed successfully | | SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) | -| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) | +| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) | To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework: @@ -1119,8 +1120,35 @@ Transport plugins provide specialized error handling interfaces. Examples includ This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized. -Sample retry with exponential back-off (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation): + +### Abandon +#### Azure Service Bus +The Azure Service Bus transport has full support for abandoning messages to the dead letter queue. + +#### RabbitMQ +Abandon will issue a `Nack` with `requeue: false`. + +#### Other transports +No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown. + +### Failure +#### RabbitMQ +While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler. + +Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions. +```cs +.Handle(x => x + .Queue("echo-request-handler") + .ExchangeBinding("test-echo") + .DeadLetterExchange("echo-request-handler-dlq") + // requeue a message on failure + .RequeueOnFailure() + .WithHandler()) +``` + +### Example usage +Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation): ```cs public class RetryHandler : ConsumerErrorHandler { @@ -1128,6 +1156,11 @@ public class RetryHandler : ConsumerErrorHandler public override async Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) { + if (!IsTranientException(exception)) + { + return Abandon(); + } + if (attempts < 3) { var delay = (attempts * 1000) + (_random.Next(1000) - 500); @@ -1137,9 +1170,18 @@ public class RetryHandler : ConsumerErrorHandler return Failure(); } + + private static bool IsTransientException(Exception exception) + { + while (exception is not SqlException && exception.InnerException != null) + { + exception = exception.InnerException; + } + + return exception is SqlException { Number: -2 or 1205 }; // Timeout or deadlock + } } ``` - ## Logging SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions): diff --git a/docs/intro.t.md b/docs/intro.t.md index 7e1e4f79..6d64a390 100644 --- a/docs/intro.t.md +++ b/docs/intro.t.md @@ -1067,10 +1067,11 @@ The returned `ConsumerErrorHandlerResult` object is used to override the executi | Result | Description | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** | | Failure | The message failed to be processed and should be returned to the queue | | Success | The pipeline must treat the message as having been processed successfully | | SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) | -| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) | +| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) | To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework: @@ -1097,8 +1098,35 @@ Transport plugins provide specialized error handling interfaces. Examples includ This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized. -Sample retry with exponential back-off (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation): + +### Abandon +#### Azure Service Bus +The Azure Service Bus transport has full support for abandoning messages to the dead letter queue. + +#### RabbitMQ +Abandon will issue a `Nack` with `requeue: false`. + +#### Other transports +No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown. + +### Failure +#### RabbitMQ +While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler. + +Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions. +```cs +.Handle(x => x + .Queue("echo-request-handler") + .ExchangeBinding("test-echo") + .DeadLetterExchange("echo-request-handler-dlq") + // requeue a message on failure + .RequeueOnFailure() + .WithHandler()) +``` + +### Example usage +Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation): ```cs public class RetryHandler : ConsumerErrorHandler { @@ -1106,6 +1134,11 @@ public class RetryHandler : ConsumerErrorHandler public override async Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) { + if (!IsTranientException(exception)) + { + return Abandon(); + } + if (attempts < 3) { var delay = (attempts * 1000) + (_random.Next(1000) - 500); @@ -1115,9 +1148,18 @@ public class RetryHandler : ConsumerErrorHandler return Failure(); } + + private static bool IsTransientException(Exception exception) + { + while (exception is not SqlException && exception.InnerException != null) + { + exception = exception.InnerException; + } + + return exception is SqlException { Number: -2 or 1205 }; // Timeout or deadlock + } } ``` - ## Logging SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions): diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs index c4221549..d7cb3e57 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface ISqsConsumerErrorHandler : IConsumerErrorHandler; -public abstract class SqsConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class SqsConsumerErrorHandler : ConsumerErrorHandler, ISqsConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs index 95818eb7..b5fec9ad 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs @@ -116,6 +116,11 @@ protected async Task Run() .ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, x.Value)); var r = await MessageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false); + if (r.Result == ProcessResult.Abandon) + { + throw new NotSupportedException("Transport does not support abandoning messages"); + } + if (r.Exception != null) { Logger.LogError(r.Exception, "Message processing error - Queue: {Queue}, MessageId: {MessageId}", Path, message.MessageId); diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs index f84f7a79..853b12da 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs @@ -42,6 +42,11 @@ public async Task ProcessEventAsync(ProcessEventArgs args) var headers = GetHeadersFromTransportMessage(args.Data); var r = await MessageProcessor.ProcessMessage(args.Data, headers, cancellationToken: args.CancellationToken).ConfigureAwait(false); + if (r.Result == ProcessResult.Abandon) + { + throw new NotSupportedException("Transport does not support abandoning messages"); + } + if (r.Exception != null) { // Note: The OnMessageFaulted was called at this point by the MessageProcessor. diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs index 335f02bb..978bcbea 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IEventHubConsumerErrorHandler : IConsumerErrorHandler; -public abstract class EventHubConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class EventHubConsumerErrorHandler : ConsumerErrorHandler, IEventHubConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs index 9154d6d8..daba0eb8 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs @@ -137,18 +137,23 @@ private Task ServiceBusSessionProcessor_SessionClosingAsync(ProcessSessionEventA } private Task ServiceBusSessionProcessor_ProcessMessageAsync(ProcessSessionMessageEventArgs args) - => ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.CancellationToken); + => ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.DeadLetterMessageAsync, args.CancellationToken); private Task ServiceBusSessionProcessor_ProcessErrorAsync(ProcessErrorEventArgs args) => ProcessErrorAsyncInternal(args.Exception, args.ErrorSource); protected Task ServiceBusProcessor_ProcessMessagesAsync(ProcessMessageEventArgs args) - => ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.CancellationToken); + => ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.DeadLetterMessageAsync, args.CancellationToken); protected Task ServiceBusProcessor_ProcessErrorAsync(ProcessErrorEventArgs args) => ProcessErrorAsyncInternal(args.Exception, args.ErrorSource); - protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage message, Func completeMessage, Func, CancellationToken, Task> abandonMessage, CancellationToken token) + protected async Task ProcessMessageAsyncInternal( + ServiceBusReceivedMessage message, + Func completeMessage, + Func, CancellationToken, Task> abandonMessage, + Func deadLetterMessage, + CancellationToken token) { // Process the message. Logger.LogDebug("Received message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); @@ -160,29 +165,38 @@ protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage messa // to avoid unnecessary exceptions. Logger.LogDebug("Abandon message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); await abandonMessage(message, null, token).ConfigureAwait(false); - return; } var r = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, cancellationToken: token).ConfigureAwait(false); - if (r.Exception != null) + switch (r.Result) { - Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); - - var messageProperties = new Dictionary - { - // Set the exception message - ["SMB.Exception"] = r.Exception.Message - }; - await abandonMessage(message, messageProperties, token).ConfigureAwait(false); - - return; + case ProcessResult.Success: + // Complete the message so that it is not received again. + // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default). + Logger.LogDebug("Complete message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); + await completeMessage(message, token).ConfigureAwait(false); + return; + + case ProcessResult.Abandon: + Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); + await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false); + return; + + case ProcessResult.Fail: + var messageProperties = new Dictionary(); + { + // Set the exception message + messageProperties.Add("SMB.Exception", r.Exception.Message); + } + + Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); + await abandonMessage(message, messageProperties, token).ConfigureAwait(false); + return; + + default: + throw new NotImplementedException(); } - - // Complete the message so that it is not received again. - // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default). - Logger.LogDebug("Complete message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); - await completeMessage(message, token).ConfigureAwait(false); } protected Task ProcessErrorAsyncInternal(Exception exception, ServiceBusErrorSource errorSource) diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs index a8ad8166..3e963ae2 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IServiceBusConsumerErrorHandler : IConsumerErrorHandler; -public abstract class ServiceBusConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class ServiceBusConsumerErrorHandler : ConsumerErrorHandler, IServiceBusConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs index a7f391bf..684ad008 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IKafkaConsumerErrorHandler : IConsumerErrorHandler; -public abstract class KafkaConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class KafkaConsumerErrorHandler : ConsumerErrorHandler, IKafkaConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs index d4b73d02..0474acab 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs @@ -115,6 +115,11 @@ public async Task OnMessage(ConsumeResult message) } var r = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false); + if (r.Result == ProcessResult.Abandon) + { + throw new NotSupportedException("Transport does not support abandoning messages"); + } + if (r.Exception != null) { // The IKafkaConsumerErrorHandler and OnMessageFaulted was called at this point by the MessageProcessor. diff --git a/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs index f235ab2c..9f02e618 100644 --- a/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IMemoryConsumerErrorHandler : IConsumerErrorHandler; -public abstract class MemoryConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class MemoryConsumerErrorHandler : ConsumerErrorHandler, IMemoryConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index 20b36e1a..ed5d7e74 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -158,6 +158,11 @@ private async Task ProduceInternal(object me var serviceProvider = targetBus?.ServiceProvider ?? Settings.ServiceProvider; // Execute the message processor in synchronous manner var r = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, currentServiceProvider: serviceProvider, cancellationToken: cancellationToken); + if (r.Result == ProcessResult.Abandon) + { + throw new NotSupportedException("Transport does not support abandoning messages"); + } + if (r.Exception != null) { // We want to pass the same exception to the sender as it happened in the handler/consumer diff --git a/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs index 9bef2034..6fcf2bba 100644 --- a/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Mqtt/IMqttConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IMqttConsumerErrorHandler : IConsumerErrorHandler; -public abstract class MqttConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class MqttConsumerErrorHandler : ConsumerErrorHandler, IMqttConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs index 8867a961..ad1f8833 100644 --- a/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface INatsConsumerErrorHandler : IConsumerErrorHandler; -public abstract class NatsConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class NatsConsumerErrorHandler : ConsumerErrorHandler, INatsConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs index baec5ffd..e7b368fd 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs @@ -118,5 +118,19 @@ public static TConsumerBuilder AcknowledgementMode(this TConsu builder.ConsumerSettings.Properties[RabbitMqProperties.MessageAcknowledgementMode] = mode; return builder; } -} + /// + /// Requeues a message on failure. Abandoned messages will not be requeued. + /// This may lead to an infinite loop if the reason for the failure is not transient. Responsibility lies with the developer to ensure that this flow does not occur. + /// + /// + /// + /// + /// + public static TConsumerBuilder RequeueOnFailure(this TConsumerBuilder builder, bool state = true) + where TConsumerBuilder : AbstractConsumerBuilder + { + builder.ConsumerSettings.Properties[RabbitMqProperties.ReqeueOnFailure] = state; + return builder; + } +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs index 85e8c34e..dc2f6048 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqHasProviderExtensions.cs @@ -14,6 +14,6 @@ public static string GetQueueName(this AbstractConsumerSettings c) public static string GetBindingRoutingKey(this AbstractConsumerSettings c, HasProviderExtensions settings = null) => c.GetOrDefault(RabbitMqProperties.BindingRoutingKey, settings, null); - public static string GetExchageType(this ProducerSettings p, HasProviderExtensions settings = null) + public static string GetExchangeType(this ProducerSettings p, HasProviderExtensions settings = null) => p.GetOrDefault(RabbitMqProperties.ExchangeType, settings, null); } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusBuilderExtensions.cs index 3a4aa7f4..8af865a8 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageBusBuilderExtensions.cs @@ -14,8 +14,8 @@ public static MessageBusBuilder WithProviderRabbitMQ(this MessageBusBuilder mbb, if (mbb == null) throw new ArgumentNullException(nameof(mbb)); if (configure == null) throw new ArgumentNullException(nameof(configure)); - var providerSettings = mbb.Settings.GetOrCreate(RabbitMqProperties.ProvderSettings, () => new RabbitMqMessageBusSettings()); - + var providerSettings = mbb.Settings.GetOrCreate(RabbitMqProperties.ProviderSettings, () => new RabbitMqMessageBusSettings()); + configure(providerSettings); return mbb.WithProvider(settings => new RabbitMqMessageBus(settings, providerSettings)); diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs index ed74d444..1e32d821 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs @@ -28,7 +28,9 @@ static internal class RabbitMqProperties public static readonly string Message = $"RabbitMQ_{nameof(Message)}"; - public static readonly string ProvderSettings = $"RabbitMQ_{nameof(ProvderSettings)}"; + public static readonly string ProviderSettings = $"RabbitMQ_{nameof(ProviderSettings)}"; public static readonly string MessageAcknowledgementMode = $"RabbitMQ_{nameof(MessageAcknowledgementMode)}"; + + public static readonly string ReqeueOnFailure = $"RabbitMQ_{nameof(ReqeueOnFailure)}"; } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs index 2a48eab3..4bac0cde 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IRabbitMqConsumerErrorHandler : IConsumerErrorHandler; -public abstract class RabbitMqConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class RabbitMqConsumerErrorHandler : ConsumerErrorHandler, IRabbitMqConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs index 9cad553e..439aef92 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs @@ -7,17 +7,33 @@ /// /// /// -internal sealed class RabbitMqAutoAcknowledgeMessageProcessor(IMessageProcessor target, - ILogger logger, - RabbitMqMessageAcknowledgementMode acknowledgementMode, - IRabbitMqConsumer consumer) - : IMessageProcessor, IDisposable +internal sealed class RabbitMqAutoAcknowledgeMessageProcessor : IMessageProcessor, IDisposable { - public IReadOnlyCollection ConsumerSettings => target.ConsumerSettings; + private readonly IMessageProcessor _target; + private readonly ILogger _logger; + private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode; + private readonly IRabbitMqConsumer _consumer; + private readonly bool _requeueOnFailure; + + public RabbitMqAutoAcknowledgeMessageProcessor( + IMessageProcessor target, + ILogger logger, + RabbitMqMessageAcknowledgementMode acknowledgementMode, + IRabbitMqConsumer consumer) + { + _target = target; + _logger = logger; + _acknowledgementMode = acknowledgementMode; + _consumer = consumer; + + _requeueOnFailure = _target.ConsumerSettings?.All(x => x.GetOrDefault(RabbitMqProperties.ReqeueOnFailure, false)) ?? false; + } + + public IReadOnlyCollection ConsumerSettings => _target.ConsumerSettings; public void Dispose() { - if (target is IDisposable targetDisposable) + if (_target is IDisposable targetDisposable) { targetDisposable.Dispose(); } @@ -25,22 +41,26 @@ public void Dispose() public async Task ProcessMessage(BasicDeliverEventArgs transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) { - var r = await target.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: cancellationToken); - - if (acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade) + var r = await _target.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: cancellationToken); + if (_acknowledgementMode == RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade) { // Acknowledge after processing - var confirmOption = r.Exception != null - ? RabbitMqMessageConfirmOptions.Nack // NAck after processing when message fails (unless the user already acknowledged in any way). - : RabbitMqMessageConfirmOptions.Ack; // Acknowledge after processing + var confirmOption = r.Result switch + { + ProcessResult.Abandon => RabbitMqMessageConfirmOptions.Nack, // NAck after processing when message fails with non-transient exception (unless the user already acknowledged in any way). + ProcessResult.Fail when (_requeueOnFailure) => RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue, // Re-queue after processing on transient failure + ProcessResult.Fail when (!_requeueOnFailure) => RabbitMqMessageConfirmOptions.Nack, // Fail after processing failure (no re-queue) + ProcessResult.Success => RabbitMqMessageConfirmOptions.Ack, // Acknowledge after processing + _ => throw new NotImplementedException() + }; - consumer.ConfirmMessage(transportMessage, confirmOption, consumerContextProperties); + _consumer.ConfirmMessage(transportMessage, confirmOption, consumerContextProperties); } if (r.Exception != null) { // We rely on the IMessageProcessor to execute the ConsumerErrorHandler, but if it's not registered in the DI, it fails, or there is another fatal error then the message will be lost. - logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, consumer.QueueName, transportMessage, transportMessage.DeliveryTag); + _logger.LogError(r.Exception, "Exchange {Exchange} - Queue {Queue}: Error processing message {Message}, delivery tag {DeliveryTag}", transportMessage.Exchange, _consumer.QueueName, transportMessage, transportMessage.DeliveryTag); } return r; } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs index f29c3e72..34150846 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs @@ -36,7 +36,7 @@ IMessageProcessor CreateMessageProcessor(IEnumerable)); @@ -44,7 +44,7 @@ IMessageProcessor CreateMessageProcessor(IEnumerable x.Instances); - // For a given rabbit channel, there is only 1 task that dispatches messages. We want to be be able to let each SMB consume process within its own task (1 or more) + // For a given rabbit channel, there is only 1 task that dispatches messages. We want to be able to let each SMB consume process within its own task (1 or more) messageProcessor = new ConcurrentMessageProcessorDecorator(instances, loggerFactory, messageProcessor); return messageProcessor; diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs index 59ed5c66..3ce14894 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs @@ -3,6 +3,7 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer { private readonly IMessageProcessor _messageProcessor; + private readonly bool _requeueOnFailure; protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; @@ -18,19 +19,30 @@ public RabbitMqResponseConsumer( : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) { _messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider); + _requeueOnFailure = requestResponseSettings.GetOrDefault(RabbitMqProperties.ReqeueOnFailure, false); } protected override async Task OnMessageReceived(Dictionary messageHeaders, BasicDeliverEventArgs transportMessage) { var r = await _messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, cancellationToken: CancellationToken); - if (r.Exception == null) + switch (r.Result) { - AckMessage(transportMessage); - } - else - { - NackMessage(transportMessage, requeue: false); + case ProcessResult.Abandon: + NackMessage(transportMessage, requeue: false); + break; + + case ProcessResult.Fail: + NackMessage(transportMessage, requeue: _requeueOnFailure); + break; + + case ProcessResult.Success: + AckMessage(transportMessage); + break; + + default: + throw new NotImplementedException(); } + return r.Exception; } } diff --git a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettingsValidationService.cs b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettingsValidationService.cs index a20337c1..444d08dc 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettingsValidationService.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettingsValidationService.cs @@ -32,7 +32,7 @@ protected override void AssertProducer(ProducerSettings producerSettings) if (routingKeyProvider == null) { // Requirement for the routing key depends on the ExchangeType - var exchangeType = producerSettings.GetExchageType(ProviderSettings); + var exchangeType = producerSettings.GetExchangeType(ProviderSettings); if (exchangeType == global::RabbitMQ.Client.ExchangeType.Direct || exchangeType == global::RabbitMQ.Client.ExchangeType.Topic) { ThrowProducerFieldNotSet(producerSettings, nameof(RabbitMqProducerBuilderExtensions.RoutingKeyProvider), $"is neither provided on the producer for exchange {producerSettings.DefaultPath} nor a default provider exists at the bus level (check that .{nameof(RabbitMqProducerBuilderExtensions.RoutingKeyProvider)}() exists on the producer or bus level). Exchange type {exchangeType} requires the producer to has a routing key provider."); @@ -75,6 +75,5 @@ protected override void AssertRequestResponseSettings() ThrowRequestResponseFieldNotSet(nameof(RabbitMqConsumerBuilderExtensions.Queue)); } } - } } diff --git a/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs b/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs index d041b78b..e1272c10 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs @@ -2,4 +2,4 @@ public interface IRedisConsumerErrorHandler : IConsumerErrorHandler; -public abstract class RedisConsumerErrorHandler : ConsumerErrorHandler; \ No newline at end of file +public abstract class RedisConsumerErrorHandler : ConsumerErrorHandler, IRedisConsumerErrorHandler; \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs index f178cd80..1b33b4e4 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisListCheckerConsumer.cs @@ -74,6 +74,11 @@ protected async Task Run() var processor = queue.Processors[i]; var r = await processor.ProcessMessage(transportMessage, transportMessage.Headers, cancellationToken: CancellationToken).ConfigureAwait(false); + if (r.Result == ProcessResult.Abandon) + { + throw new NotSupportedException("Transport does not support abandoning messages"); + } + if (r.Exception != null) { Logger.LogError(r.Exception, "Error occurred while processing the list item on {Queue}", queue.Name); diff --git a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs index 77204dff..099af9bd 100644 --- a/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs +++ b/src/SlimMessageBus.Host.Redis/Consumers/RedisTopicConsumer.cs @@ -45,6 +45,11 @@ private async Task OnMessage(ChannelMessage m) var messageWithHeaders = (MessageWithHeaders)_envelopeSerializer.Deserialize(typeof(MessageWithHeaders), m.Message); var r = await _messageProcessor.ProcessMessage(messageWithHeaders, messageWithHeaders.Headers, cancellationToken: CancellationToken); + if (r.Result == ProcessResult.Abandon) + { + throw new NotSupportedException("Transport does not support abandoning messages"); + } + exception = r.Exception; } catch (Exception e) diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs index 94d584c7..f031335f 100644 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs @@ -7,7 +7,8 @@ public abstract class ConsumerErrorHandler : BaseConsumerErrorHandler, IConsu public abstract class BaseConsumerErrorHandler { - public static ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure; - public static ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry; - public static ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response); + public virtual ConsumerErrorHandlerResult Abandon() => ConsumerErrorHandlerResult.Abandon; + public virtual ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure; + public virtual ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry; + public virtual ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs index 7b914fee..fda3910e 100644 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs @@ -4,40 +4,41 @@ public record ConsumerErrorHandlerResult { private static readonly object _noResponse = new(); - private ConsumerErrorHandlerResult(ConsumerErrorHandlerResultEnum result, object response = null) + private ConsumerErrorHandlerResult(ProcessResult result, object response = null) { Result = result; Response = response ?? _noResponse; } - public ConsumerErrorHandlerResultEnum Result { get; private set; } + public ProcessResult Result { get; private set; } public object Response { get; private set; } public bool HasResponse => !ReferenceEquals(Response, _noResponse); + /// + /// the message should be abandoned and placed in the dead letter queue. + /// + /// + /// This feature is not supported by every transport. + /// + public static readonly ConsumerErrorHandlerResult Abandon = new(ProcessResult.Abandon); + /// /// The message should be placed back into the queue. /// - public static readonly ConsumerErrorHandlerResult Failure = new(ConsumerErrorHandlerResultEnum.Fail); + public static readonly ConsumerErrorHandlerResult Failure = new(ProcessResult.Fail); /// /// The message processor should evaluate the message as having been processed successfully. /// - public static readonly ConsumerErrorHandlerResult Success = new(ConsumerErrorHandlerResultEnum.Success); + public static readonly ConsumerErrorHandlerResult Success = new(ProcessResult.Success); /// /// The message processor should evaluate the message as having been processed successfully and use the specified fallback response for the or . /// - public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new(ConsumerErrorHandlerResultEnum.Success, response); + public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new(ProcessResult.Success, response); /// /// Retry processing the message without placing it back in the queue. /// - public static readonly ConsumerErrorHandlerResult Retry = new(ConsumerErrorHandlerResultEnum.Retry); + public static readonly ConsumerErrorHandlerResult Retry = new(ProcessResult.Retry); } - -public enum ConsumerErrorHandlerResultEnum -{ - Fail, - Retry, - Success -} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs index b02b64fd..1ccebe3a 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs @@ -1,7 +1,7 @@ -namespace SlimMessageBus.Host; +namespace SlimMessageBus.Host; /// -/// Decorator profor that increases the amount of messages being concurrentlycessed. +/// Decorator for that increases the amount of messages being concurrently processed. /// The expectation is that will be executed synchronously (in sequential order) by the caller on which we want to increase amount of concurrent transportMessage being processed. /// /// @@ -58,7 +58,7 @@ public async Task ProcessMessage(TMessage transportMessage { // report the last exception _lastException = null; - return new(e, _lastExceptionSettings, null); + return new(ProcessResult.Fail, e, _lastExceptionSettings, null); } Interlocked.Increment(ref _pendingCount); @@ -67,7 +67,7 @@ public async Task ProcessMessage(TMessage transportMessage _ = ProcessInBackground(transportMessage, messageHeaders, currentServiceProvider, consumerContextProperties, cancellationToken); // Not exception - we don't know yet - return new(null, null, null); + return new(ProcessResult.Success, null, null, null); } /// diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageHandler.cs index ef8c3d20..cf5381b7 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageHandler.cs @@ -2,6 +2,6 @@ public interface IMessageHandler { - Task<(object Response, Exception ResponseException, string RequestId)> DoHandle(object message, IReadOnlyDictionary messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, object transportMessage = null, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default); + Task<(ProcessResult Result, object Response, Exception ResponseException, string RequestId)> DoHandle(object message, IReadOnlyDictionary messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, object transportMessage = null, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default); Task ExecuteConsumer(object message, IConsumerContext consumerContext, IMessageTypeConsumerInvokerSettings consumerInvoker, Type responseType); } diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageProcessor.cs index d6ce7240..891090f0 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/IMessageProcessor.cs @@ -1,10 +1,11 @@ namespace SlimMessageBus.Host; -public readonly struct ProcessMessageResult(Exception exception, AbstractConsumerSettings consumerSettings, object response) +public readonly struct ProcessMessageResult(ProcessResult result, Exception exception, AbstractConsumerSettings consumerSettings, object response) { public Exception Exception { get; init; } = exception; public AbstractConsumerSettings ConsumerSettings { get; init; } = consumerSettings; public object Response { get; init; } = response; + public ProcessResult Result { get; init; } = result; } public interface IMessageProcessor diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs index 21717605..c1dfd997 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs @@ -58,7 +58,7 @@ public MessageHandler( } } - public async Task<(object Response, Exception ResponseException, string RequestId)> DoHandle(object message, IReadOnlyDictionary messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, object transportMessage = null, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) + public async Task<(ProcessResult Result, object Response, Exception ResponseException, string RequestId)> DoHandle(object message, IReadOnlyDictionary messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, object transportMessage = null, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) { var messageType = message.GetType(); @@ -89,7 +89,7 @@ public MessageHandler( { // ToDo: Call interceptor // Do not process the expired message - return (ResponseForExpiredRequest, null, requestId); + return (ProcessResult.Success, ResponseForExpiredRequest, null, requestId); } var messageBusTarget = new MessageBusProxy(MessageBus, messageScope.ServiceProvider); @@ -103,23 +103,25 @@ public MessageHandler( try { var response = await DoHandleInternal(message, consumerInvoker, messageType, hasResponse, responseType, messageScope, consumerContext).ConfigureAwait(false); - return (response, null, requestId); + return (ProcessResult.Success, response, null, requestId); } catch (Exception ex) { attempts++; var handleErrorResult = await DoHandleError(message, messageType, messageScope, consumerContext, ex, attempts, cancellationToken).ConfigureAwait(false); - if (handleErrorResult.Result != ConsumerErrorHandlerResultEnum.Retry) + if (handleErrorResult.Result == ProcessResult.Retry) { - var exception = handleErrorResult.Result != ConsumerErrorHandlerResultEnum.Success ? ex : null; - var response = handleErrorResult.HasResponse ? handleErrorResult.Response : null; - return (response, exception, requestId); + continue; } + + var exception = handleErrorResult.Result != ProcessResult.Success ? ex : null; + var response = handleErrorResult.HasResponse ? handleErrorResult.Response : null; + return (handleErrorResult.Result, response, exception, requestId); } } catch (Exception e) { - return (null, e, requestId); + return (ProcessResult.Fail, null, e, requestId); } finally { diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index 46e4fa8f..6c96d0e5 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -1,5 +1,7 @@ namespace SlimMessageBus.Host; +using System.Diagnostics; + /// /// Implementation of that performs orchestration around processing of a new message using an instance of the declared consumer ( or interface). /// @@ -66,6 +68,7 @@ protected override ConsumerContext CreateConsumerContext(IReadOnlyDictionary ProcessMessage(TTransportMessage transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties = null, IServiceProvider currentServiceProvider = null, CancellationToken cancellationToken = default) { IMessageTypeConsumerInvokerSettings lastConsumerInvoker = null; + var result = ProcessResult.Success; Exception lastException = null; object lastResponse = null; @@ -94,7 +97,9 @@ public async virtual Task ProcessMessage(TTransportMessage break; } - (lastResponse, lastException, var requestId) = await DoHandle(message, messageHeaders, consumerInvoker, transportMessage, consumerContextProperties, currentServiceProvider, cancellationToken).ConfigureAwait(false); + (result, lastResponse, lastException, var requestId) = await DoHandle(message, messageHeaders, consumerInvoker, transportMessage, consumerContextProperties, currentServiceProvider, cancellationToken).ConfigureAwait(false); + + Debug.Assert(result != ProcessResult.Retry); if (consumerInvoker.ParentSettings.ConsumerMode == ConsumerMode.RequestResponse && _responseProducer != null) { @@ -136,7 +141,7 @@ public async virtual Task ProcessMessage(TTransportMessage _logger.LogDebug(e, "Processing of the message {TransportMessage} failed", transportMessage); lastException = e; } - return new(lastException, lastException != null ? lastConsumerInvoker?.ParentSettings : null, lastResponse); + return new(result, lastException, lastException != null ? lastConsumerInvoker?.ParentSettings : null, lastResponse); } protected Type GetMessageType(IReadOnlyDictionary headers) diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs new file mode 100644 index 00000000..6e100377 --- /dev/null +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host; + +public enum ProcessResult +{ + Abandon, + Fail, + Retry, + Success +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index 10242fc2..69ee4ff2 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -1,8 +1,6 @@ namespace SlimMessageBus.Host; -public abstract class ResponseMessageProcessor -{ -} +public abstract class ResponseMessageProcessor; /// /// The implementation that processes the responses arriving to the bus. @@ -48,7 +46,9 @@ public Task ProcessMessage(TTransportMessage transportMess // We can only continue and process all messages in the lease ex = e; } - return Task.FromResult(new ProcessMessageResult(ex, _requestResponseSettings, null)); + + var result = ex == null ? ProcessResult.Success : ProcessResult.Fail; + return Task.FromResult(new ProcessMessageResult(result, ex, _requestResponseSettings, null)); } /// @@ -117,5 +117,4 @@ private Exception OnResponseArrived(TTransportMessage transportMessage, string p return null; } - } diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs index db9d3e56..edeb6aa5 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs @@ -6,12 +6,14 @@ namespace SlimMessageBus.Host.AzureServiceBus.Test; using System.Runtime.CompilerServices; using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using SlimMessageBus.Host; +using SlimMessageBus.Host.Interceptor; using SlimMessageBus.Host.Serialization.Json; using SlimMessageBus.Host.Test.Common.IntegrationTest; @@ -113,6 +115,95 @@ public async Task BasicPubSubOnQueue(bool bulkProduce) await BasicPubSub(1, bulkProduce: bulkProduce); } + [Fact] + public async Task AbandonedMessage_DeliveredToDeadLetterQueue() + { + // arrange + var queue = QueueName(); + + AddTestServices((services, configuration) => + { + services.AddTransient(sp => + { + var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + return ActivatorUtilities.CreateInstance(sp, connectionString); + }); + + services.AddTransient(sp => + { + var connectionString = Secrets.Service.PopulateSecrets(configuration["Azure:ServiceBus"]); + return ActivatorUtilities.CreateInstance(sp, connectionString); + }); + + services.AddScoped(typeof(IConsumerInterceptor<>), typeof(AbandonPingMessageInterceptor<>)); + services.AddScoped(typeof(IServiceBusConsumerErrorHandler<>), typeof(AbandonMessageConsumerErrorHandler<>)); + }); + + AddBusConfiguration(mbb => + { + mbb + .Produce(x => x.DefaultQueue(queue).WithModifier(MessageModifier)) + .Consume(x => x + .Queue(queue) + .WithConsumer() + .Instances(20)); + }); + + var adminClient = ServiceProvider.GetRequiredService(); + var testMetric = ServiceProvider.GetRequiredService(); + var consumedMessages = ServiceProvider.GetRequiredService>(); + var client = ServiceProvider.GetRequiredService(); + var deadLetterReceiver = client.CreateReceiver($"{queue}/$DeadLetterQueue"); + + // act + var messageBus = MessageBus; + + var producedMessages = Enumerable + .Range(0, NumberOfMessages) + .Select(i => new PingMessage { Counter = i }) + .ToList(); + + await messageBus.Publish(producedMessages); + await consumedMessages.WaitUntilArriving(newMessagesTimeout: 5); + + // assert + // ensure number of instances of consumers created matches + testMetric.CreatedConsumerCount.Should().Be(NumberOfMessages); + consumedMessages.Count.Should().Be(NumberOfMessages); + + // all messages should be in the DLQ + var properties = await adminClient.GetQueueRuntimePropertiesAsync(queue); + properties.Value.ActiveMessageCount.Should().Be(0); + properties.Value.DeadLetterMessageCount.Should().Be(NumberOfMessages); + + // all messages should have been sent directly to the DLQ + var messages = await deadLetterReceiver.PeekMessagesAsync(NumberOfMessages); + messages.Count.Should().Be(NumberOfMessages); + foreach (var message in messages) + { + message.DeliveryCount.Should().Be(0); + message.ApplicationProperties["DeadLetterReason"].Should().Be(nameof(ApplicationException)); + } + } + + public class AbandonPingMessageInterceptor : IConsumerInterceptor + { + public async Task OnHandle(T message, Func> next, IConsumerContext context) + { + await next(); + var pingMessage = message as PingMessage; + throw new ApplicationException($"Abandon message {pingMessage.Counter:000} on path {context.Path}."); + } + } + + public class AbandonMessageConsumerErrorHandler : ServiceBusConsumerErrorHandler + { + public override Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + { + return Task.FromResult(Abandon()); + } + } + [Fact] public async Task BasicPubSubWithCustomConsumerOnQueue() { diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs index c548124b..f31aebdf 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/ConcurrentMessageProcessorQueueTests.cs @@ -20,7 +20,7 @@ public async Task When_Enqueue_Given_FourMessagesEnqueued_Then_ProcessMessageIsC static async Task ProcessMessageFake(object transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties, IServiceProvider currentServiceProvider, CancellationToken cancellationToken) { await Task.Delay(500, cancellationToken); - return new ProcessMessageResult(null, null, null); + return new ProcessMessageResult(ProcessResult.Success, null, null, null); } messageProcessor diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs index 05fa159f..eb3bfc66 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/Consumers/MessageProcessorQueueTests.cs @@ -18,7 +18,7 @@ public async Task When_Enqueue_Given_TwoMessagesEnqueued_Then_ProcessMessageIsCa static async Task ProcessMessageFake(object transportMessage, IReadOnlyDictionary messageHeaders, IDictionary consumerContextProperties, IServiceProvider currentServiceProvider, CancellationToken cancellationToken) { await Task.Delay(500, cancellationToken); - return new ProcessMessageResult(null, null, null); + return new ProcessMessageResult(ProcessResult.Success, null, null, null); } messageProcessor diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs index 5b1fd18b..43557685 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs @@ -4,16 +4,15 @@ public class RabbitMqAutoAcknowledgeMessageProcessorTests { - private readonly Mock> _targetMock; - private readonly Mock _targetDisposableMock; + private readonly Mock> _messageProcessorMock; + private readonly Mock _messageProcessorDisposableMock; private readonly Mock _consumerMock; private readonly BasicDeliverEventArgs _transportMessage; - private readonly RabbitMqAutoAcknowledgeMessageProcessor _subject; public RabbitMqAutoAcknowledgeMessageProcessorTests() { - _targetMock = new Mock>(); - _targetDisposableMock = _targetMock.As(); + _messageProcessorMock = new Mock>(); + _messageProcessorDisposableMock = _messageProcessorMock.As(); _consumerMock = new Mock(); _transportMessage = new BasicDeliverEventArgs @@ -21,29 +20,36 @@ public RabbitMqAutoAcknowledgeMessageProcessorTests() Exchange = "exchange", DeliveryTag = 1 }; - - _subject = new RabbitMqAutoAcknowledgeMessageProcessor(_targetMock.Object, NullLogger.Instance, RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade, _consumerMock.Object); } [Fact] public void When_Dispose_Then_CallsDisposeOnTarget() { // arrange + var subject = CreateSubject(); // act - _subject.Dispose(); + subject.Dispose(); // assert - _targetDisposableMock.Verify(x => x.Dispose(), Times.Once); + _messageProcessorDisposableMock.Verify(x => x.Dispose(), Times.Once); } - [Fact] - public async Task When_ProcessMessage_Then_AutoAcknowledge() + [Theory] + [InlineData(ProcessResult.Abandon, RabbitMqMessageConfirmOptions.Nack)] + [InlineData(ProcessResult.Fail, RabbitMqMessageConfirmOptions.Nack)] + [InlineData(ProcessResult.Success, RabbitMqMessageConfirmOptions.Ack)] + public async Task When_ProcessMessage_Then_AutoAcknowledge(ProcessResult processResult, RabbitMqMessageConfirmOptions expected) { // arrange + _messageProcessorMock + .Setup(x => x.ProcessMessage(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(new ProcessMessageResult(processResult, null, null, null))); + + var subject = CreateSubject(); // act - var result = await _subject.ProcessMessage( + var result = await subject.ProcessMessage( _transportMessage, new Dictionary(), null, @@ -53,8 +59,13 @@ public async Task When_ProcessMessage_Then_AutoAcknowledge() // assert _consumerMock.Verify(x => x.ConfirmMessage( _transportMessage, - RabbitMqMessageConfirmOptions.Ack, + expected, It.IsAny>(), It.IsAny()), Times.Once); } + + private RabbitMqAutoAcknowledgeMessageProcessor CreateSubject() + { + return new RabbitMqAutoAcknowledgeMessageProcessor(_messageProcessorMock.Object, NullLogger.Instance, RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade, _consumerMock.Object); + } } diff --git a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs index 9d5d1577..6fa82ba4 100644 --- a/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test.Common/IntegrationTest/BaseIntegrationTest.cs @@ -12,6 +12,7 @@ public abstract class BaseIntegrationTest : IAsyncLifetime { private readonly Lazy _serviceProvider; private Action messageBusBuilderAction = (mbb) => { }; + private Action testServicesBuilderAction = (services, configuration) => { }; private ILogger? _logger; protected ILogger Logger => _logger ??= ServiceProvider.GetRequiredService>(); @@ -45,6 +46,7 @@ protected BaseIntegrationTest(ITestOutputHelper output) services.AddSingleton(); SetupServices(services, Configuration); + ApplyTestServices(services, Configuration); return services.BuildServiceProvider(); }); @@ -62,7 +64,18 @@ protected void AddBusConfiguration(Action action) }; } + protected void AddTestServices(Action action) + { + var prevAction = testServicesBuilderAction; + testServicesBuilderAction = (services, configuration) => + { + prevAction(services, configuration); + action(services, configuration); + }; + } + protected void ApplyBusConfiguration(MessageBusBuilder mbb) => messageBusBuilderAction?.Invoke(mbb); + protected void ApplyTestServices(IServiceCollection services, IConfigurationRoot configuration) => testServicesBuilderAction?.Invoke(services, configuration); protected async Task EnsureConsumersStarted() { diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs index fabd0748..328f123a 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs @@ -41,7 +41,7 @@ public async Task When_WaitAll_Then_WaitsOnAllPendingMessageProcessToFinish(bool .Returns(async () => { await Task.Delay(TimeSpan.FromSeconds(1)); - return new(null, null, null); + return new(ProcessResult.Success, null, null, null); }); var subject = new ConcurrentMessageProcessorDecorator(1, NullLoggerFactory.Instance, _messageProcessorMock.Object); @@ -115,7 +115,7 @@ public async Task When_ProcessMessage_Given_NMessagesAndConcurrencySetToC_Then_N // Leaving critical section Interlocked.Decrement(ref currentSectionCount); - return new(null, null, null); + return new(ProcessResult.Success, null, null, null); }); // act @@ -147,7 +147,7 @@ public async Task When_ProcessMessage_Given_ExceptionHappensOnTarget_Then_Except _messageProcessorMock .Setup(x => x.ProcessMessage(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) - .ReturnsAsync(new ProcessMessageResult(exception, null, null)); + .ReturnsAsync(new ProcessMessageResult(ProcessResult.Fail, exception, null, null)); var msg = new SomeMessage(); var msgHeaders = new Dictionary();