From 0ffebc9ae5b8e4ca65bd3718471715a22e4b7740 Mon Sep 17 00:00:00 2001 From: Richard Pringle Date: Thu, 2 Jan 2025 14:03:02 +0800 Subject: [PATCH] #356 Consumer error response per transport Signed-off-by: Richard Pringle --- docs/intro.md | 70 ++++++++----------- docs/intro.t.md | 68 ++++++++---------- .../Consumer/SqsBaseConsumer.cs | 5 -- .../Consumer/EhPartitionConsumer.cs | 5 -- .../Consumer/AsbBaseConsumer.cs | 6 +- .../IServiceBusConsumerErrorHandler.cs | 15 +++- .../Consumer/KafkaPartitionConsumer.cs | 5 -- .../MemoryMessageBus.cs | 5 -- .../RabbitMqConsumerBuilderExtensions.cs | 15 ---- .../Config/RabbitMqProperties.cs | 2 - .../IRabbitMqConsumerErrorHandler.cs | 15 +++- ...RabbitMqAutoAcknowledgeMessageProcessor.cs | 10 +-- .../Consumers/RabbitMqResponseConsumer.cs | 12 ++-- .../Consumers/RedisListCheckerConsumer.cs | 5 -- .../Consumers/RedisTopicConsumer.cs | 5 -- .../Collections/RuntimeTypeCache.cs | 4 +- .../ErrorHandling/ConsumerErrorHandler.cs | 9 ++- .../ConsumerErrorHandlerResult.cs | 44 ------------ .../ErrorHandling/IConsumerErrorHandler.cs | 2 +- .../Consumer/ErrorHandling/ProcessResult.cs | 39 +++++++++++ .../ConcurrentMessageProcessorDecorator.cs | 2 +- .../MessageProcessors/MessageHandler.cs | 12 ++-- .../MessageProcessors/MessageProcessor.cs | 2 +- .../MessageProcessors/ProcessResult.cs | 9 --- .../ResponseMessageProcessor.cs | 2 +- src/SlimMessageBus.sln | 13 +--- .../ServiceBusMessageBusIt.cs | 14 ++-- .../MemoryMessageBusTests.cs | 4 +- ...tMqAutoAcknowledgeMessageProcessorTests.cs | 24 +++++-- .../IntegrationTests/RabbitMqMessageBusIt.cs | 6 +- ...ConcurrentMessageProcessorDecoratorTest.cs | 2 +- .../Consumer/MessageHandlerTest.cs | 4 +- 32 files changed, 188 insertions(+), 247 deletions(-) delete mode 100644 src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs create mode 100644 src/SlimMessageBus.Host/Consumer/ErrorHandling/ProcessResult.cs delete mode 100644 src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs diff --git a/docs/intro.md b/docs/intro.md index 984d8886..09cacc67 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -38,6 +38,8 @@ - [Order of Execution](#order-of-execution) - [Generic interceptors](#generic-interceptors) - [Error Handling](#error-handling) + - [Azure Service Bus](#azure-service-bus) + - [RabbitMQ](#rabbitmq) - [Logging](#logging) - [Debugging](#debugging) - [Provider specific functionality](#provider-specific-functionality) @@ -1081,15 +1083,14 @@ public interface IConsumerErrorHandler /// Exception that occurred during message processing. /// The number of times the message has been attempted to be processed. /// The error handling result. - Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); + Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); } ``` -The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline. +The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options. -| Result | Description | +| ProcessResult | Description | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** | | Failure | The message failed to be processed and should be returned to the queue | | Success | The pipeline must treat the message as having been processed successfully | | SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) | @@ -1105,52 +1106,38 @@ services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabb services.AddTransient(typeof(IConsumerErrorHandler<>), typeof(CustomConsumerErrorHandler<>)); ``` -Transport plugins provide specialized error handling interfaces. Examples include: +Transport plugins provide specialized error handling interfaces with a default implementation that includes any additional `ProcessResult` options. Examples include: -- [IMemoryConsumerErrorHandler](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) -- [IRabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) -- [IKafkaConsumerErrorHandler](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) -- [IRedisConsumerErrorHandler](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) -- [INatsConsumerErrorHandler](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) -- [IServiceBusConsumerErrorHandler](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) -- [IEventHubConsumerErrorHandler](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) -- [ISqsConsumerErrorHandler](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) +| Interface | Implementation including reference to additional options (if any) | +| ---------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | +| [IMemoryConsumerErrorHandler](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | [MemoryConsumerErrorHandler](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | +| [IRabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | +| [IKafkaConsumerErrorHandler](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | [KafkaConsumerErrorHandler](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | +| [IRedisConsumerErrorHandler](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | [RedisConsumerErrorHandler](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | +| [INatsConsumerErrorHandler](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | [NatsConsumerErrorHandler](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | +| [IServiceBusConsumerErrorHandler](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | [ServiceBusConsumerErrorHandler](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | +| [IEventHubConsumerErrorHandler](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | [EventHubConsumerErrorHandler](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | +| [ISqsConsumerErrorHandler](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | [SqsConsumerErrorHandler](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | > The message processing pipeline will always attempt to use the transport-specific error handler (e.g., `IMemoryConsumerErrorHandler`) first. If unavailable, it will then look for the generic error handler (`IConsumerErrorHandler`). This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized. - -### Abandon #### Azure Service Bus -The Azure Service Bus transport has full support for abandoning messages to the dead letter queue. - -#### RabbitMQ -Abandon will issue a `Nack` with `requeue: false`. - -#### Other transports -No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown. +| ProcessResult | Description | +| ------------- | ---------------------------------------------------------------------------------- | +| DeadLetter | Abandons further processing of the message by sending it to the dead letter queue. | -### Failure #### RabbitMQ -While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler. +| ProcessResult | Description | +| ------------- | --------------------------------------------------------------- | +| Requeue | Return the message to the queue for re-processing 1. | -Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions. - -```cs -.Handle(x => x - .Queue("echo-request-handler") - .ExchangeBinding("test-echo") - .DeadLetterExchange("echo-request-handler-dlq") - // requeue a message on failure - .RequeueOnFailure() - .WithHandler()) -``` +1 RabbitMQ does not have a maximum delivery count. Please use `Requeue` with caution as, if no other conditions are applied, it may result in an infinite message loop. -### Example usage -Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation): +Example retry with exponential back-off and short-curcuit to dead letter exchange on non-transient exceptions (using the [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) abstract implementation): ```cs -public class RetryHandler : ConsumerErrorHandler +public class RetryHandler : RabbitMqConsumerErrorHandler { private static readonly Random _random = new(); @@ -1158,17 +1145,20 @@ public class RetryHandler : ConsumerErrorHandler { if (!IsTranientException(exception)) { - return Abandon(); + return Failure(); } if (attempts < 3) { var delay = (attempts * 1000) + (_random.Next(1000) - 500); await Task.Delay(delay, consumerContext.CancellationToken); + + // in process retry return Retry(); } - return Failure(); + // re-qeuue for out of process execution + return Requeue(); } private static bool IsTransientException(Exception exception) diff --git a/docs/intro.t.md b/docs/intro.t.md index 6d64a390..ee6857b7 100644 --- a/docs/intro.t.md +++ b/docs/intro.t.md @@ -38,6 +38,8 @@ - [Order of Execution](#order-of-execution) - [Generic interceptors](#generic-interceptors) - [Error Handling](#error-handling) + - [Azure Service Bus](#azure-service-bus) + - [RabbitMQ](#rabbitmq) - [Logging](#logging) - [Debugging](#debugging) - [Provider specific functionality](#provider-specific-functionality) @@ -1063,11 +1065,10 @@ Message processing by consumers or handlers may result in exceptions. The [ICons @[:cs](../src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs,Interface) -The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline. +The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options. -| Result | Description | +| ProcessResult | Description | | ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** | | Failure | The message failed to be processed and should be returned to the queue | | Success | The pipeline must treat the message as having been processed successfully | | SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) | @@ -1083,52 +1084,38 @@ services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabb services.AddTransient(typeof(IConsumerErrorHandler<>), typeof(CustomConsumerErrorHandler<>)); ``` -Transport plugins provide specialized error handling interfaces. Examples include: +Transport plugins provide specialized error handling interfaces with a default implementation that includes any additional `ProcessResult` options. Examples include: -- [IMemoryConsumerErrorHandler](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) -- [IRabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) -- [IKafkaConsumerErrorHandler](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) -- [IRedisConsumerErrorHandler](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) -- [INatsConsumerErrorHandler](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) -- [IServiceBusConsumerErrorHandler](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) -- [IEventHubConsumerErrorHandler](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) -- [ISqsConsumerErrorHandler](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) +| Interface | Implementation including reference to additional options (if any) | +| ---------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | +| [IMemoryConsumerErrorHandler](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | [MemoryConsumerErrorHandler](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | +| [IRabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | +| [IKafkaConsumerErrorHandler](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | [KafkaConsumerErrorHandler](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | +| [IRedisConsumerErrorHandler](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | [RedisConsumerErrorHandler](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | +| [INatsConsumerErrorHandler](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | [NatsConsumerErrorHandler](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | +| [IServiceBusConsumerErrorHandler](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | [ServiceBusConsumerErrorHandler](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | +| [IEventHubConsumerErrorHandler](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | [EventHubConsumerErrorHandler](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | +| [ISqsConsumerErrorHandler](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | [SqsConsumerErrorHandler](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | > The message processing pipeline will always attempt to use the transport-specific error handler (e.g., `IMemoryConsumerErrorHandler`) first. If unavailable, it will then look for the generic error handler (`IConsumerErrorHandler`). This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized. - -### Abandon #### Azure Service Bus -The Azure Service Bus transport has full support for abandoning messages to the dead letter queue. - -#### RabbitMQ -Abandon will issue a `Nack` with `requeue: false`. - -#### Other transports -No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown. +| ProcessResult | Description | +| ------------- | ---------------------------------------------------------------------------------- | +| DeadLetter | Abandons further processing of the message by sending it to the dead letter queue. | -### Failure #### RabbitMQ -While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler. +| ProcessResult | Description | +| ------------- | --------------------------------------------------------------- | +| Requeue | Return the message to the queue for re-processing 1. | -Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions. - -```cs -.Handle(x => x - .Queue("echo-request-handler") - .ExchangeBinding("test-echo") - .DeadLetterExchange("echo-request-handler-dlq") - // requeue a message on failure - .RequeueOnFailure() - .WithHandler()) -``` +1 RabbitMQ does not have a maximum delivery count. Please use `Requeue` with caution as, if no other conditions are applied, it may result in an infinite message loop. -### Example usage -Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation): +Example retry with exponential back-off and short-curcuit to dead letter exchange on non-transient exceptions (using the [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) abstract implementation): ```cs -public class RetryHandler : ConsumerErrorHandler +public class RetryHandler : RabbitMqConsumerErrorHandler { private static readonly Random _random = new(); @@ -1136,17 +1123,20 @@ public class RetryHandler : ConsumerErrorHandler { if (!IsTranientException(exception)) { - return Abandon(); + return Failure(); } if (attempts < 3) { var delay = (attempts * 1000) + (_random.Next(1000) - 500); await Task.Delay(delay, consumerContext.CancellationToken); + + // in process retry return Retry(); } - return Failure(); + // re-qeuue for out of process execution + return Requeue(); } private static bool IsTransientException(Exception exception) diff --git a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs index b5fec9ad..95818eb7 100644 --- a/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs @@ -116,11 +116,6 @@ protected async Task Run() .ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, x.Value)); var r = await MessageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false); - if (r.Result == ProcessResult.Abandon) - { - throw new NotSupportedException("Transport does not support abandoning messages"); - } - if (r.Exception != null) { Logger.LogError(r.Exception, "Message processing error - Queue: {Queue}, MessageId: {MessageId}", Path, message.MessageId); diff --git a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs index 853b12da..f84f7a79 100644 --- a/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumer.cs @@ -42,11 +42,6 @@ public async Task ProcessEventAsync(ProcessEventArgs args) var headers = GetHeadersFromTransportMessage(args.Data); var r = await MessageProcessor.ProcessMessage(args.Data, headers, cancellationToken: args.CancellationToken).ConfigureAwait(false); - if (r.Result == ProcessResult.Abandon) - { - throw new NotSupportedException("Transport does not support abandoning messages"); - } - if (r.Exception != null) { // Note: The OnMessageFaulted was called at this point by the MessageProcessor. diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs index daba0eb8..326a0991 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs @@ -171,19 +171,19 @@ protected async Task ProcessMessageAsyncInternal( var r = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, cancellationToken: token).ConfigureAwait(false); switch (r.Result) { - case ProcessResult.Success: + case ProcessResult.SuccessState: // Complete the message so that it is not received again. // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default). Logger.LogDebug("Complete message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); await completeMessage(message, token).ConfigureAwait(false); return; - case ProcessResult.Abandon: + case ServiceBusProcessResult.DeadLetterState: Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId); await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false); return; - case ProcessResult.Fail: + case ProcessResult.FailureState: var messageProperties = new Dictionary(); { // Set the exception message diff --git a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs index 3e963ae2..21cfd89f 100644 --- a/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs @@ -2,4 +2,17 @@ public interface IServiceBusConsumerErrorHandler : IConsumerErrorHandler; -public abstract class ServiceBusConsumerErrorHandler : ConsumerErrorHandler, IServiceBusConsumerErrorHandler; \ No newline at end of file +public abstract class ServiceBusConsumerErrorHandler : ConsumerErrorHandler, IServiceBusConsumerErrorHandler +{ + public ProcessResult DeadLetter() => ServiceBusProcessResult.DeadLetter; +} + +public record ServiceBusProcessResult : ProcessResult +{ + /// + /// The message must be sent to the dead letter queue. + /// + public static readonly ProcessResult DeadLetter = new DeadLetterState(); + + public record DeadLetterState() : ProcessResult(); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs index 0474acab..d4b73d02 100644 --- a/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs +++ b/src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumer.cs @@ -115,11 +115,6 @@ public async Task OnMessage(ConsumeResult message) } var r = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false); - if (r.Result == ProcessResult.Abandon) - { - throw new NotSupportedException("Transport does not support abandoning messages"); - } - if (r.Exception != null) { // The IKafkaConsumerErrorHandler and OnMessageFaulted was called at this point by the MessageProcessor. diff --git a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs index ed5d7e74..20b36e1a 100644 --- a/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs +++ b/src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs @@ -158,11 +158,6 @@ private async Task ProduceInternal(object me var serviceProvider = targetBus?.ServiceProvider ?? Settings.ServiceProvider; // Execute the message processor in synchronous manner var r = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, currentServiceProvider: serviceProvider, cancellationToken: cancellationToken); - if (r.Result == ProcessResult.Abandon) - { - throw new NotSupportedException("Transport does not support abandoning messages"); - } - if (r.Exception != null) { // We want to pass the same exception to the sender as it happened in the handler/consumer diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs index e7b368fd..e417799d 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqConsumerBuilderExtensions.cs @@ -118,19 +118,4 @@ public static TConsumerBuilder AcknowledgementMode(this TConsu builder.ConsumerSettings.Properties[RabbitMqProperties.MessageAcknowledgementMode] = mode; return builder; } - - /// - /// Requeues a message on failure. Abandoned messages will not be requeued. - /// This may lead to an infinite loop if the reason for the failure is not transient. Responsibility lies with the developer to ensure that this flow does not occur. - /// - /// - /// - /// - /// - public static TConsumerBuilder RequeueOnFailure(this TConsumerBuilder builder, bool state = true) - where TConsumerBuilder : AbstractConsumerBuilder - { - builder.ConsumerSettings.Properties[RabbitMqProperties.ReqeueOnFailure] = state; - return builder; - } } \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs index 1e32d821..2d56d75c 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqProperties.cs @@ -31,6 +31,4 @@ static internal class RabbitMqProperties public static readonly string ProviderSettings = $"RabbitMQ_{nameof(ProviderSettings)}"; public static readonly string MessageAcknowledgementMode = $"RabbitMQ_{nameof(MessageAcknowledgementMode)}"; - - public static readonly string ReqeueOnFailure = $"RabbitMQ_{nameof(ReqeueOnFailure)}"; } diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs index 4bac0cde..8778d657 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs @@ -2,4 +2,17 @@ public interface IRabbitMqConsumerErrorHandler : IConsumerErrorHandler; -public abstract class RabbitMqConsumerErrorHandler : ConsumerErrorHandler, IRabbitMqConsumerErrorHandler; \ No newline at end of file +public abstract class RabbitMqConsumerErrorHandler : ConsumerErrorHandler, IRabbitMqConsumerErrorHandler +{ + public virtual ProcessResult Requeue() => RabbitMqProcessResult.Requeue; +} + +public record RabbitMqProcessResult : ProcessResult +{ + /// + /// The message should be placed back into the queue. + /// + public static readonly ProcessResult Requeue = new RequeueState(); + + public record RequeueState() : ProcessResult(); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs index 439aef92..f2f18de2 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqAutoAcknowledgeMessageProcessor.cs @@ -13,7 +13,6 @@ internal sealed class RabbitMqAutoAcknowledgeMessageProcessor : IMessageProcesso private readonly ILogger _logger; private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode; private readonly IRabbitMqConsumer _consumer; - private readonly bool _requeueOnFailure; public RabbitMqAutoAcknowledgeMessageProcessor( IMessageProcessor target, @@ -25,8 +24,6 @@ public RabbitMqAutoAcknowledgeMessageProcessor( _logger = logger; _acknowledgementMode = acknowledgementMode; _consumer = consumer; - - _requeueOnFailure = _target.ConsumerSettings?.All(x => x.GetOrDefault(RabbitMqProperties.ReqeueOnFailure, false)) ?? false; } public IReadOnlyCollection ConsumerSettings => _target.ConsumerSettings; @@ -47,10 +44,9 @@ public async Task ProcessMessage(BasicDeliverEventArgs tra // Acknowledge after processing var confirmOption = r.Result switch { - ProcessResult.Abandon => RabbitMqMessageConfirmOptions.Nack, // NAck after processing when message fails with non-transient exception (unless the user already acknowledged in any way). - ProcessResult.Fail when (_requeueOnFailure) => RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue, // Re-queue after processing on transient failure - ProcessResult.Fail when (!_requeueOnFailure) => RabbitMqMessageConfirmOptions.Nack, // Fail after processing failure (no re-queue) - ProcessResult.Success => RabbitMqMessageConfirmOptions.Ack, // Acknowledge after processing + RabbitMqProcessResult.RequeueState => RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue, // Re-queue after processing on transient failure + ProcessResult.FailureState => RabbitMqMessageConfirmOptions.Nack, // Fail after processing failure (no re-queue) + ProcessResult.SuccessState => RabbitMqMessageConfirmOptions.Ack, // Acknowledge after processing _ => throw new NotImplementedException() }; diff --git a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs index 3ce14894..96b1c250 100644 --- a/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs +++ b/src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqResponseConsumer.cs @@ -3,7 +3,6 @@ public class RabbitMqResponseConsumer : AbstractRabbitMqConsumer { private readonly IMessageProcessor _messageProcessor; - private readonly bool _requeueOnFailure; protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade; @@ -19,7 +18,6 @@ public RabbitMqResponseConsumer( : base(loggerFactory.CreateLogger(), channel, queueName, headerValueConverter) { _messageProcessor = new ResponseMessageProcessor(loggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider); - _requeueOnFailure = requestResponseSettings.GetOrDefault(RabbitMqProperties.ReqeueOnFailure, false); } protected override async Task OnMessageReceived(Dictionary messageHeaders, BasicDeliverEventArgs transportMessage) @@ -27,15 +25,15 @@ protected override async Task OnMessageReceived(Dictionary>, IConsumerContext, Task>> ConsumerInterceptorType { get; } public IGenericTypeCache2> HandlerInterceptorType { get; } - public IGenericTypeCache>> ConsumerErrorHandlerType { get; } + public IGenericTypeCache>> ConsumerErrorHandlerType { get; } public RuntimeTypeCache() { @@ -78,7 +78,7 @@ public RuntimeTypeCache() typeof(IRequestHandlerInterceptor<,>), nameof(IRequestHandlerInterceptor.OnHandle)); - ConsumerErrorHandlerType = new GenericTypeCache>>( + ConsumerErrorHandlerType = new GenericTypeCache>>( typeof(IConsumerErrorHandler<>), nameof(IConsumerErrorHandler.OnHandleError)); } diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs index f031335f..32f6fc89 100644 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs @@ -2,13 +2,12 @@ public abstract class ConsumerErrorHandler : BaseConsumerErrorHandler, IConsumerErrorHandler { - public abstract Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); + public abstract Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); } public abstract class BaseConsumerErrorHandler { - public virtual ConsumerErrorHandlerResult Abandon() => ConsumerErrorHandlerResult.Abandon; - public virtual ConsumerErrorHandlerResult Failure() => ConsumerErrorHandlerResult.Failure; - public virtual ConsumerErrorHandlerResult Retry() => ConsumerErrorHandlerResult.Retry; - public virtual ConsumerErrorHandlerResult Success(object response = null) => response == null ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.SuccessWithResponse(response); + public virtual ProcessResult Failure() => ProcessResult.Failure; + public virtual ProcessResult Retry() => ProcessResult.Retry; + public virtual ProcessResult Success(object response = null) => response == null ? ProcessResult.Success : ProcessResult.SuccessWithResponse(response); } \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs deleted file mode 100644 index fda3910e..00000000 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandlerResult.cs +++ /dev/null @@ -1,44 +0,0 @@ -namespace SlimMessageBus.Host; - -public record ConsumerErrorHandlerResult -{ - private static readonly object _noResponse = new(); - - private ConsumerErrorHandlerResult(ProcessResult result, object response = null) - { - Result = result; - Response = response ?? _noResponse; - } - - public ProcessResult Result { get; private set; } - public object Response { get; private set; } - public bool HasResponse => !ReferenceEquals(Response, _noResponse); - - /// - /// the message should be abandoned and placed in the dead letter queue. - /// - /// - /// This feature is not supported by every transport. - /// - public static readonly ConsumerErrorHandlerResult Abandon = new(ProcessResult.Abandon); - - /// - /// The message should be placed back into the queue. - /// - public static readonly ConsumerErrorHandlerResult Failure = new(ProcessResult.Fail); - - /// - /// The message processor should evaluate the message as having been processed successfully. - /// - public static readonly ConsumerErrorHandlerResult Success = new(ProcessResult.Success); - - /// - /// The message processor should evaluate the message as having been processed successfully and use the specified fallback response for the or . - /// - public static ConsumerErrorHandlerResult SuccessWithResponse(object response) => new(ProcessResult.Success, response); - - /// - /// Retry processing the message without placing it back in the queue. - /// - public static readonly ConsumerErrorHandlerResult Retry = new(ProcessResult.Retry); -} diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs index 5c40030d..a742d3a7 100644 --- a/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/IConsumerErrorHandler.cs @@ -20,6 +20,6 @@ public interface IConsumerErrorHandler /// Exception that occurred during message processing. /// The number of times the message has been attempted to be processed. /// The error handling result. - Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); + Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts); } // doc:fragment:Interface \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/ErrorHandling/ProcessResult.cs b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ProcessResult.cs new file mode 100644 index 00000000..49d41460 --- /dev/null +++ b/src/SlimMessageBus.Host/Consumer/ErrorHandling/ProcessResult.cs @@ -0,0 +1,39 @@ +namespace SlimMessageBus.Host; + +public abstract record ProcessResult +{ + private static readonly object _noResponse = new(); + + protected ProcessResult(object response = null) + { + Response = response ?? _noResponse; + } + + public object Response { get; } + public bool HasResponse => !ReferenceEquals(Response, _noResponse); + + /// + /// The message should be placed back into the queue. + /// + public static readonly ProcessResult Failure = new FailureState(); + + /// + /// Retry processing the message without placing it back in the queue. + /// + public static readonly ProcessResult Retry = new RetryState(); + + /// + /// The message processor should evaluate the message as having been processed successfully. + /// + public static readonly ProcessResult Success = new SuccessState(); + + /// + /// The message processor should evaluate the message as having been processed successfully and use the specified fallback response for the or . + /// + public static ProcessResult SuccessWithResponse(object response) => new SuccessStateWithResponse(response); + + public record FailureState() : ProcessResult(); + public record RetryState() : ProcessResult(); + public record SuccessState(object Response = null) : ProcessResult(Response); + public record SuccessStateWithResponse(object Response) : SuccessState(Response); +} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs index 1ccebe3a..0fba6f97 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ConcurrentMessageProcessorDecorator.cs @@ -58,7 +58,7 @@ public async Task ProcessMessage(TMessage transportMessage { // report the last exception _lastException = null; - return new(ProcessResult.Fail, e, _lastExceptionSettings, null); + return new(ProcessResult.Failure, e, _lastExceptionSettings, null); } Interlocked.Increment(ref _pendingCount); diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs index c1dfd997..62d873ae 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs @@ -109,19 +109,19 @@ public MessageHandler( { attempts++; var handleErrorResult = await DoHandleError(message, messageType, messageScope, consumerContext, ex, attempts, cancellationToken).ConfigureAwait(false); - if (handleErrorResult.Result == ProcessResult.Retry) + if (handleErrorResult is ProcessResult.RetryState) { continue; } - var exception = handleErrorResult.Result != ProcessResult.Success ? ex : null; + var exception = handleErrorResult is not ProcessResult.SuccessState ? ex : null; var response = handleErrorResult.HasResponse ? handleErrorResult.Response : null; - return (handleErrorResult.Result, response, exception, requestId); + return (handleErrorResult, response, exception, requestId); } } catch (Exception e) { - return (ProcessResult.Fail, null, e, requestId); + return (ProcessResult.Failure, null, e, requestId); } finally { @@ -148,9 +148,9 @@ private async Task DoHandleInternal(object message, IMessageTypeConsumer return await ExecuteConsumer(message, consumerContext, consumerInvoker, responseType).ConfigureAwait(false); } - private async Task DoHandleError(object message, Type messageType, IMessageScope messageScope, IConsumerContext consumerContext, Exception ex, int attempts, CancellationToken cancellationToken) + private async Task DoHandleError(object message, Type messageType, IMessageScope messageScope, IConsumerContext consumerContext, Exception ex, int attempts, CancellationToken cancellationToken) { - var errorHandlerResult = ConsumerErrorHandlerResult.Failure; + var errorHandlerResult = ProcessResult.Failure; // Use the bus provider specific error handler type first (if provided) var consumerErrorHandler = ConsumerErrorHandlerOpenGenericType is not null diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index 6c96d0e5..74f19aa2 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -99,7 +99,7 @@ public async virtual Task ProcessMessage(TTransportMessage (result, lastResponse, lastException, var requestId) = await DoHandle(message, messageHeaders, consumerInvoker, transportMessage, consumerContextProperties, currentServiceProvider, cancellationToken).ConfigureAwait(false); - Debug.Assert(result != ProcessResult.Retry); + Debug.Assert(result is not ProcessResult.RetryState); if (consumerInvoker.ParentSettings.ConsumerMode == ConsumerMode.RequestResponse && _responseProducer != null) { diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs deleted file mode 100644 index 6e100377..00000000 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ProcessResult.cs +++ /dev/null @@ -1,9 +0,0 @@ -namespace SlimMessageBus.Host; - -public enum ProcessResult -{ - Abandon, - Fail, - Retry, - Success -} \ No newline at end of file diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs index 69ee4ff2..2290b8f8 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/ResponseMessageProcessor.cs @@ -47,7 +47,7 @@ public Task ProcessMessage(TTransportMessage transportMess ex = e; } - var result = ex == null ? ProcessResult.Success : ProcessResult.Fail; + var result = ex == null ? ProcessResult.Success : ProcessResult.Failure; return Task.FromResult(new ProcessMessageResult(result, ex, _requestResponseSettings, null)); } diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index 39ff2b16..f6d03ace 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -12,42 +12,31 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Samples", "Samples", "{A5B1 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Content", "Content", "{8835721D-ECA4-42CB-BA8B-F8C1A06340EC}" ProjectSection(SolutionItems) = preProject - Samples\Content\DSC0843.jpg = Samples\Content\DSC0843.jpg Samples\Content\DSC0862.jpg = Samples\Content\DSC0862.jpg - Samples\Content\DSC1042.jpg = Samples\Content\DSC1042.jpg Samples\Content\DSC1044.jpg = Samples\Content\DSC1044.jpg Samples\Content\DSC2145.jpg = Samples\Content\DSC2145.jpg Samples\Content\DSC2152.jpg = Samples\Content\DSC2152.jpg - Samples\Content\DSC2468.jpg = Samples\Content\DSC2468.jpg Samples\Content\DSC3714.jpg = Samples\Content\DSC3714.jpg Samples\Content\DSC3718.jpg = Samples\Content\DSC3718.jpg - Samples\Content\DSC3720.jpg = Samples\Content\DSC3720.jpg Samples\Content\DSC3781.jpg = Samples\Content\DSC3781.jpg Samples\Content\DSC3808.jpg = Samples\Content\DSC3808.jpg Samples\Content\DSC3884.jpg = Samples\Content\DSC3884.jpg - Samples\Content\DSC4029.jpg = Samples\Content\DSC4029.jpg Samples\Content\DSC4037.jpg = Samples\Content\DSC4037.jpg Samples\Content\DSC4038.jpg = Samples\Content\DSC4038.jpg - Samples\Content\DSC4205.jpg = Samples\Content\DSC4205.jpg Samples\Content\DSC4216.jpg = Samples\Content\DSC4216.jpg - Samples\Content\DSC5718.jpg = Samples\Content\DSC5718.jpg Samples\Content\DSC5819.jpg = Samples\Content\DSC5819.jpg Samples\Content\DSC5839.jpg = Samples\Content\DSC5839.jpg Samples\Content\DSC8169.jpg = Samples\Content\DSC8169.jpg Samples\Content\DSC8177.jpg = Samples\Content\DSC8177.jpg Samples\Content\DSC8327.jpg = Samples\Content\DSC8327.jpg - Samples\Content\DSC8333.jpg = Samples\Content\DSC8333.jpg Samples\Content\DSC8462.jpg = Samples\Content\DSC8462.jpg Samples\Content\DSC8789.jpg = Samples\Content\DSC8789.jpg - Samples\Content\DSC9135.jpg = Samples\Content\DSC9135.jpg Samples\Content\DSC9230.jpg = Samples\Content\DSC9230.jpg Samples\Content\DSC9235.jpg = Samples\Content\DSC9235.jpg - Samples\Content\DSC9319.jpg = Samples\Content\DSC9319.jpg Samples\Content\DSC9323.jpg = Samples\Content\DSC9323.jpg Samples\Content\DSC9329.jpg = Samples\Content\DSC9329.jpg Samples\Content\DSC9823.jpg = Samples\Content\DSC9823.jpg Samples\Content\DSC9827.jpg = Samples\Content\DSC9827.jpg - Samples\Content\DSC9831.jpg = Samples\Content\DSC9831.jpg Samples\Content\DSC9839.jpg = Samples\Content\DSC9839.jpg Samples\Content\DSC9892.jpg = Samples\Content\DSC9892.jpg Samples\Content\DSC9904.jpg = Samples\Content\DSC9904.jpg @@ -140,6 +129,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{CBE53E71-7 ..\docs\intro.t.md = ..\docs\intro.t.md ..\docs\NuGet.md = ..\docs\NuGet.md ..\docs\plugin_asyncapi.t.md = ..\docs\plugin_asyncapi.t.md + ..\docs\plugin_outbox.t.md = ..\docs\plugin_outbox.t.md ..\docs\provider_amazon_sqs.t.md = ..\docs\provider_amazon_sqs.t.md ..\docs\provider_azure_eventhubs.md = ..\docs\provider_azure_eventhubs.md ..\docs\provider_azure_servicebus.md = ..\docs\provider_azure_servicebus.md @@ -148,7 +138,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Docs", "Docs", "{CBE53E71-7 ..\docs\provider_memory.t.md = ..\docs\provider_memory.t.md ..\docs\provider_mqtt.md = ..\docs\provider_mqtt.md ..\docs\provider_nats.t.md = ..\docs\provider_nats.t.md - ..\docs\plugin_outbox.t.md = ..\docs\plugin_outbox.t.md ..\docs\provider_redis.md = ..\docs\provider_redis.md ..\docs\README.md = ..\docs\README.md ..\README.md = ..\README.md diff --git a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs index edeb6aa5..22560fd2 100644 --- a/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusIt.cs @@ -116,7 +116,7 @@ public async Task BasicPubSubOnQueue(bool bulkProduce) } [Fact] - public async Task AbandonedMessage_DeliveredToDeadLetterQueue() + public async Task DeadLetterMessage_IsDeliveredTo_DeadLetterQueue() { // arrange var queue = QueueName(); @@ -135,8 +135,8 @@ public async Task AbandonedMessage_DeliveredToDeadLetterQueue() return ActivatorUtilities.CreateInstance(sp, connectionString); }); - services.AddScoped(typeof(IConsumerInterceptor<>), typeof(AbandonPingMessageInterceptor<>)); - services.AddScoped(typeof(IServiceBusConsumerErrorHandler<>), typeof(AbandonMessageConsumerErrorHandler<>)); + services.AddScoped(typeof(IConsumerInterceptor<>), typeof(ThrowExceptionPingMessageInterceptor<>)); + services.AddScoped(typeof(IServiceBusConsumerErrorHandler<>), typeof(DeadLetterMessageConsumerErrorHandler<>)); }); AddBusConfiguration(mbb => @@ -186,7 +186,7 @@ public async Task AbandonedMessage_DeliveredToDeadLetterQueue() } } - public class AbandonPingMessageInterceptor : IConsumerInterceptor + public class ThrowExceptionPingMessageInterceptor : IConsumerInterceptor { public async Task OnHandle(T message, Func> next, IConsumerContext context) { @@ -196,11 +196,11 @@ public async Task OnHandle(T message, Func> next, IConsumer } } - public class AbandonMessageConsumerErrorHandler : ServiceBusConsumerErrorHandler + public class DeadLetterMessageConsumerErrorHandler : ServiceBusConsumerErrorHandler { - public override Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + public override Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) { - return Task.FromResult(Abandon()); + return Task.FromResult(DeadLetter()); } } diff --git a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs index 1e934b8b..6f0316d9 100644 --- a/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs +++ b/src/Tests/SlimMessageBus.Host.Memory.Test/MemoryMessageBusTests.cs @@ -430,7 +430,7 @@ public async Task When_Publish_Given_AConsumersThatThrowsException_Then_Exceptio var consumerErrorHandlerMock = new Mock>(); consumerErrorHandlerMock .Setup(x => x.OnHandleError(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(() => errorHandlerHandlesError ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.Failure); + .ReturnsAsync(() => errorHandlerHandlesError ? ProcessResult.Success : ProcessResult.Failure); _serviceProviderMock.ProviderMock .Setup(x => x.GetService(typeof(IConsumer))) @@ -478,7 +478,7 @@ public async Task When_Send_Given_AHandlerThatThrowsException_Then_ExceptionIsBu var consumerErrorHandlerMock = new Mock>(); consumerErrorHandlerMock .Setup(x => x.OnHandleError(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(() => errorHandlerHandlesError ? ConsumerErrorHandlerResult.SuccessWithResponse(null) : ConsumerErrorHandlerResult.Failure); + .ReturnsAsync(() => errorHandlerHandlesError ? ProcessResult.SuccessWithResponse(null) : ProcessResult.Failure); _serviceProviderMock.ProviderMock .Setup(x => x.GetService(typeof(IRequestHandler))) diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs index 43557685..ed67f89e 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/Consumer/RabbitMqAutoAcknowledgeMessageProcessorTests.cs @@ -35,11 +35,25 @@ public void When_Dispose_Then_CallsDisposeOnTarget() _messageProcessorDisposableMock.Verify(x => x.Dispose(), Times.Once); } - [Theory] - [InlineData(ProcessResult.Abandon, RabbitMqMessageConfirmOptions.Nack)] - [InlineData(ProcessResult.Fail, RabbitMqMessageConfirmOptions.Nack)] - [InlineData(ProcessResult.Success, RabbitMqMessageConfirmOptions.Ack)] - public async Task When_ProcessMessage_Then_AutoAcknowledge(ProcessResult processResult, RabbitMqMessageConfirmOptions expected) + [Fact] + public Task When_Requeue_ThenAutoNackWithRequeue() + { + return When_ProcessMessage_Then_AutoAcknowledge(RabbitMqProcessResult.Requeue, RabbitMqMessageConfirmOptions.Nack | RabbitMqMessageConfirmOptions.Requeue); + } + + [Fact] + public Task When_Failure_ThenAutoNack() + { + return When_ProcessMessage_Then_AutoAcknowledge(RabbitMqProcessResult.Failure, RabbitMqMessageConfirmOptions.Nack); + } + + [Fact] + public Task When_Success_AutoAcknowledge() + { + return When_ProcessMessage_Then_AutoAcknowledge(RabbitMqProcessResult.Success, RabbitMqMessageConfirmOptions.Ack); + } + + private async Task When_ProcessMessage_Then_AutoAcknowledge(ProcessResult processResult, RabbitMqMessageConfirmOptions expected) { // arrange _messageProcessorMock diff --git a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs index 3c884a16..3b6c6413 100644 --- a/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs +++ b/src/Tests/SlimMessageBus.Host.RabbitMQ.Test/IntegrationTests/RabbitMqMessageBusIt.cs @@ -371,7 +371,7 @@ public static Task SimulateFakeException(int counter) /// public class CustomRabbitMqConsumerErrorHandler : IRabbitMqConsumerErrorHandler { - public Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) + public Task OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts) { // Check if this is consumer context for RabbitMQ var isRabbitMqContext = consumerContext.GetTransportMessage() != null; @@ -390,7 +390,7 @@ public Task OnHandleError(T message, IConsumerContex } return Task.FromResult(isRabbitMqContext - ? ConsumerErrorHandlerResult.Success - : ConsumerErrorHandlerResult.Failure); + ? ProcessResult.Success + : ProcessResult.Failure); } } diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs index 328f123a..2829c30f 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/ConcurrentMessageProcessorDecoratorTest.cs @@ -147,7 +147,7 @@ public async Task When_ProcessMessage_Given_ExceptionHappensOnTarget_Then_Except _messageProcessorMock .Setup(x => x.ProcessMessage(It.IsAny(), It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) - .ReturnsAsync(new ProcessMessageResult(ProcessResult.Fail, exception, null, null)); + .ReturnsAsync(new ProcessMessageResult(ProcessResult.Failure, exception, null, null)); var msg = new SomeMessage(); var msgHeaders = new Dictionary(); diff --git a/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs b/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs index 9c2030bf..060df69c 100644 --- a/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs +++ b/src/Tests/SlimMessageBus.Host.Test/Consumer/MessageHandlerTest.cs @@ -102,7 +102,7 @@ public async Task When_DoHandle_Given_ConsumerThatThrowsExceptionAndErrorHandler consumerErrorHandlerMock .Setup(x => x.OnHandleError(someMessage, It.IsAny(), someException, It.IsAny())) - .ReturnsAsync(() => errorHandlerWasAbleToHandle ? ConsumerErrorHandlerResult.Success : ConsumerErrorHandlerResult.Failure); + .ReturnsAsync(() => errorHandlerWasAbleToHandle ? ProcessResult.Success : ProcessResult.Failure); if (errorHandlerRegistered) { @@ -177,7 +177,7 @@ public async Task When_DoHandle_Given_ConsumerThatThrowsExceptionAndErrorHandler consumerErrorHandlerMock .Setup(x => x.OnHandleError(someMessage, It.IsAny(), someException, It.IsAny())) - .ReturnsAsync(() => ConsumerErrorHandlerResult.Retry); + .ReturnsAsync(() => ProcessResult.Retry); busMock.ServiceProviderMock .Setup(x => x.GetService(typeof(IConsumerErrorHandler)))