Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host] Consumer error response per transport (#356) #359

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 30 additions & 40 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
- [Order of Execution](#order-of-execution)
- [Generic interceptors](#generic-interceptors)
- [Error Handling](#error-handling)
- [Azure Service Bus](#azure-service-bus)
- [RabbitMQ](#rabbitmq)
- [Logging](#logging)
- [Debugging](#debugging)
- [Provider specific functionality](#provider-specific-functionality)
Expand Down Expand Up @@ -1081,15 +1083,14 @@ public interface IConsumerErrorHandler<in T>
/// <param name="exception">Exception that occurred during message processing.</param>
/// <param name="attempts">The number of times the message has been attempted to be processed.</param>
/// <returns>The error handling result.</returns>
Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
Task<ProcessResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts);
}
```

The returned `ConsumerErrorHandlerResult` object is used to override the execution for the remainder of the execution pipeline.
The returned `ProcessResult` object is used to override the execution for the remainder of the execution pipeline. Some transports provide additional options.

| Result | Description |
| ProcessResult | Description |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** |
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
Expand All @@ -1105,70 +1106,59 @@ services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabb
services.AddTransient(typeof(IConsumerErrorHandler<>), typeof(CustomConsumerErrorHandler<>));
```

Transport plugins provide specialized error handling interfaces. Examples include:
Transport plugins provide specialized error handling interfaces with a default implementation that includes any additional `ProcessResult` options. Examples include:

- [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs)
- [IRabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs)
- [IKafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs)
- [IRedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs)
- [INatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs)
- [IServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs)
- [IEventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs)
- [ISqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs)
| Interface | Implementation including reference to additional options (if any) |
| ---------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
| [IMemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) | [MemoryConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Memory/Consumers/IMemoryConsumerErrorHandler.cs) |
| [IRabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) | [RabbitMqConsumerErrorHandler<T>](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) |
| [IKafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) | [KafkaConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Kafka/Consumer/IKafkaConsumerErrorHandler.cs) |
| [IRedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) | [RedisConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Redis/Consumers/IRedisConsumerErrorHandler.cs) |
| [INatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) | [NatsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.Nats/INatsConsumerErrorHandler.cs) |
| [IServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) | [ServiceBusConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureServiceBus/Consumer/IServiceBusConsumerErrorHandler.cs) |
| [IEventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) | [EventHubConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AzureEventHub/Consumer/IEventHubConsumerErrorHandler.cs) |
| [ISqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) | [SqsConsumerErrorHandler<T>](../src/SlimMessageBus.Host.AmazonSQS/Consumer/ISqsConsumerErrorHandler.cs) |

> The message processing pipeline will always attempt to use the transport-specific error handler (e.g., `IMemoryConsumerErrorHandler<T>`) first. If unavailable, it will then look for the generic error handler (`IConsumerErrorHandler<T>`).

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.


### Abandon
#### Azure Service Bus
The Azure Service Bus transport has full support for abandoning messages to the dead letter queue.

#### RabbitMQ
Abandon will issue a `Nack` with `requeue: false`.

#### Other transports
No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown.
| ProcessResult | Description |
| ------------- | ---------------------------------------------------------------------------------- |
| DeadLetter | Abandons further processing of the message by sending it to the dead letter queue. |

### Failure
#### RabbitMQ
While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler.
| ProcessResult | Description |
| ------------- | --------------------------------------------------------------- |
| Requeue | Return the message to the queue for re-processing <sup>1</sup>. |

Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions.

```cs
.Handle<EchoRequest, EchoResponse>(x => x
.Queue("echo-request-handler")
.ExchangeBinding("test-echo")
.DeadLetterExchange("echo-request-handler-dlq")
// requeue a message on failure
.RequeueOnFailure()
.WithHandler<EchoRequestHandler>())
```
<sup>1</sup> RabbitMQ does not have a maximum delivery count. Please use `Requeue` with caution as, if no other conditions are applied, it may result in an infinite message loop.

### Example usage
Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):
Example retry with exponential back-off and short-curcuit to dead letter exchange on non-transient exceptions (using the [RabbitMqConsumerErrorHandler](../src/SlimMessageBus.Host.RabbitMQ/Consumers/IRabbitMqConsumerErrorHandler.cs) abstract implementation):
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
public class RetryHandler<T> : RabbitMqConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (!IsTranientException(exception))
{
return Abandon();
return Failure();
}

if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
await Task.Delay(delay, consumerContext.CancellationToken);

// in process retry
return Retry();
}

return Failure();
// re-qeuue for out of process execution
return Requeue();
}

private static bool IsTransientException(Exception exception)
Expand Down
Loading
Loading