Skip to content

Commit

Permalink
#356 Abandon/dead letter message
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <[email protected]>
  • Loading branch information
EtherZa authored and zarusz committed Jan 1, 2025
1 parent fd2e9e3 commit 15834b6
Show file tree
Hide file tree
Showing 41 changed files with 428 additions and 120 deletions.
48 changes: 45 additions & 3 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -1089,10 +1089,11 @@ The returned `ConsumerErrorHandlerResult` object is used to override the executi

| Result | Description |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** |
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) |
| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) |

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand All @@ -1119,15 +1120,47 @@ Transport plugins provide specialized error handling interfaces. Examples includ

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.

Sample retry with exponential back-off (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):

### Abandon
#### Azure Service Bus
The Azure Service Bus transport has full support for abandoning messages to the dead letter queue.

#### RabbitMQ
Abandon will issue a `Nack` with `requeue: false`.

#### Other transports
No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown.

### Failure
#### RabbitMQ
While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler.

Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions.

```cs
.Handle<EchoRequest, EchoResponse>(x => x
.Queue("echo-request-handler")
.ExchangeBinding("test-echo")
.DeadLetterExchange("echo-request-handler-dlq")
// requeue a message on failure
.RequeueOnFailure()
.WithHandler<EchoRequestHandler>())
```

### Example usage
Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (!IsTranientException(exception))
{
return Abandon();
}

if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
Expand All @@ -1137,9 +1170,18 @@ public class RetryHandler<T> : ConsumerErrorHandler<T>

return Failure();
}

private static bool IsTransientException(Exception exception)
{
while (exception is not SqlException && exception.InnerException != null)
{
exception = exception.InnerException;
}

return exception is SqlException { Number: -2 or 1205 }; // Timeout or deadlock
}
}
```

## Logging

SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions):
Expand Down
48 changes: 45 additions & 3 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -1067,10 +1067,11 @@ The returned `ConsumerErrorHandlerResult` object is used to override the executi

| Result | Description |
| ------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Abandon | The message should be sent to the dead letter queue/exchange. **Not supported by all transports.** |
| Failure | The message failed to be processed and should be returned to the queue |
| Success | The pipeline must treat the message as having been processed successfully |
| SuccessWithResponse | The pipeline to treat the message as having been processed successfully, returning the response to the request/response invocation ([IRequestResponseBus<T>](../src/SlimMessageBus/RequestResponse/IRequestResponseBus.cs)) |
| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) |
| Retry | Re-create and execute the pipeline (including the message scope when using [per-message DI container scopes](#per-message-di-container-scope)) |

To enable SMB to recognize the error handler, it must be registered within the Microsoft Dependency Injection (MSDI) framework:

Expand All @@ -1097,15 +1098,47 @@ Transport plugins provide specialized error handling interfaces. Examples includ

This approach allows for transport-specific error handling, ensuring that specialized handlers can be prioritized.

Sample retry with exponential back-off (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):

### Abandon
#### Azure Service Bus
The Azure Service Bus transport has full support for abandoning messages to the dead letter queue.

#### RabbitMQ
Abandon will issue a `Nack` with `requeue: false`.

#### Other transports
No other transports currently support `Abandon` and calling `Abandon` will result in `NotSupportedException` being thrown.

### Failure
#### RabbitMQ
While RabbitMQ supports dead letter exchanges, SMB's default implementation is not to requeue messages on `Failure`. If requeuing is required, it can be enabled by setting `RequeueOnFailure()` when configuring a consumer/handler.

Please be aware that as RabbitMQ does not have a maximum delivery count and enabling requeue may result in an infinite message loop. When `RequeueOnFailure()` has been set, it is the developer's responsibility to configure an appropriate `IConsumerErrorHandler` that will `Abandon` all non-transient exceptions.

```cs
.Handle<EchoRequest, EchoResponse>(x => x
.Queue("echo-request-handler")
.ExchangeBinding("test-echo")
.DeadLetterExchange("echo-request-handler-dlq")
// requeue a message on failure
.RequeueOnFailure()
.WithHandler<EchoRequestHandler>())
```

### Example usage
Retry with exponential back-off and short-curcuit dead letter on non-transient exceptions (using the [ConsumerErrorHandler](../src/SlimMessageBus.Host/Consumer/ErrorHandling/ConsumerErrorHandler.cs) abstract implementation):
```cs
public class RetryHandler<T> : ConsumerErrorHandler<T>
{
private static readonly Random _random = new();

public override async Task<ConsumerErrorHandlerResult> OnHandleError(T message, IConsumerContext consumerContext, Exception exception, int attempts)
{
if (!IsTranientException(exception))
{
return Abandon();
}

if (attempts < 3)
{
var delay = (attempts * 1000) + (_random.Next(1000) - 500);
Expand All @@ -1115,9 +1148,18 @@ public class RetryHandler<T> : ConsumerErrorHandler<T>

return Failure();
}

private static bool IsTransientException(Exception exception)
{
while (exception is not SqlException && exception.InnerException != null)
{
exception = exception.InnerException;
}

return exception is SqlException { Number: -2 or 1205 }; // Timeout or deadlock
}
}
```

## Logging

SlimMessageBus uses [Microsoft.Extensions.Logging.Abstractions](https://www.nuget.org/packages/Microsoft.Extensions.Logging.Abstractions):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface ISqsConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class SqsConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
public abstract class SqsConsumerErrorHandler<T> : ConsumerErrorHandler<T>, ISqsConsumerErrorHandler<T>;
5 changes: 5 additions & 0 deletions src/SlimMessageBus.Host.AmazonSQS/Consumer/SqsBaseConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ protected async Task Run()
.ToDictionary(x => x.Key, x => HeaderSerializer.Deserialize(x.Key, x.Value));

var r = await MessageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: CancellationToken).ConfigureAwait(false);
if (r.Result == ProcessResult.Abandon)
{
throw new NotSupportedException("Transport does not support abandoning messages");
}

if (r.Exception != null)
{
Logger.LogError(r.Exception, "Message processing error - Queue: {Queue}, MessageId: {MessageId}", Path, message.MessageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public async Task ProcessEventAsync(ProcessEventArgs args)

var headers = GetHeadersFromTransportMessage(args.Data);
var r = await MessageProcessor.ProcessMessage(args.Data, headers, cancellationToken: args.CancellationToken).ConfigureAwait(false);
if (r.Result == ProcessResult.Abandon)
{
throw new NotSupportedException("Transport does not support abandoning messages");
}

if (r.Exception != null)
{
// Note: The OnMessageFaulted was called at this point by the MessageProcessor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface IEventHubConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class EventHubConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
public abstract class EventHubConsumerErrorHandler<T> : ConsumerErrorHandler<T>, IEventHubConsumerErrorHandler<T>;
54 changes: 34 additions & 20 deletions src/SlimMessageBus.Host.AzureServiceBus/Consumer/AsbBaseConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,23 @@ private Task ServiceBusSessionProcessor_SessionClosingAsync(ProcessSessionEventA
}

private Task ServiceBusSessionProcessor_ProcessMessageAsync(ProcessSessionMessageEventArgs args)
=> ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.CancellationToken);
=> ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.DeadLetterMessageAsync, args.CancellationToken);

private Task ServiceBusSessionProcessor_ProcessErrorAsync(ProcessErrorEventArgs args)
=> ProcessErrorAsyncInternal(args.Exception, args.ErrorSource);

protected Task ServiceBusProcessor_ProcessMessagesAsync(ProcessMessageEventArgs args)
=> ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.CancellationToken);
=> ProcessMessageAsyncInternal(args.Message, args.CompleteMessageAsync, args.AbandonMessageAsync, args.DeadLetterMessageAsync, args.CancellationToken);

protected Task ServiceBusProcessor_ProcessErrorAsync(ProcessErrorEventArgs args)
=> ProcessErrorAsyncInternal(args.Exception, args.ErrorSource);

protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage message, Func<ServiceBusReceivedMessage, CancellationToken, Task> completeMessage, Func<ServiceBusReceivedMessage, IDictionary<string, object>, CancellationToken, Task> abandonMessage, CancellationToken token)
protected async Task ProcessMessageAsyncInternal(
ServiceBusReceivedMessage message,
Func<ServiceBusReceivedMessage, CancellationToken, Task> completeMessage,
Func<ServiceBusReceivedMessage, IDictionary<string, object>, CancellationToken, Task> abandonMessage,
Func<ServiceBusReceivedMessage, string, string, CancellationToken, Task> deadLetterMessage,
CancellationToken token)
{
// Process the message.
Logger.LogDebug("Received message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
Expand All @@ -160,29 +165,38 @@ protected async Task ProcessMessageAsyncInternal(ServiceBusReceivedMessage messa
// to avoid unnecessary exceptions.
Logger.LogDebug("Abandon message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await abandonMessage(message, null, token).ConfigureAwait(false);

return;
}

var r = await MessageProcessor.ProcessMessage(message, message.ApplicationProperties, cancellationToken: token).ConfigureAwait(false);
if (r.Exception != null)
switch (r.Result)
{
Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);

var messageProperties = new Dictionary<string, object>
{
// Set the exception message
["SMB.Exception"] = r.Exception.Message
};
await abandonMessage(message, messageProperties, token).ConfigureAwait(false);

return;
case ProcessResult.Success:
// Complete the message so that it is not received again.
// This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
Logger.LogDebug("Complete message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await completeMessage(message, token).ConfigureAwait(false);
return;

case ProcessResult.Abandon:
Logger.LogError(r.Exception, "Dead letter message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await deadLetterMessage(message, r.Exception?.GetType().Name ?? string.Empty, r.Exception?.Message ?? string.Empty, token).ConfigureAwait(false);
return;

case ProcessResult.Fail:
var messageProperties = new Dictionary<string, object>();
{
// Set the exception message
messageProperties.Add("SMB.Exception", r.Exception.Message);
}

Logger.LogError(r.Exception, "Abandon message (exception occurred while processing) - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await abandonMessage(message, messageProperties, token).ConfigureAwait(false);
return;

default:
throw new NotImplementedException();
}

// Complete the message so that it is not received again.
// This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
Logger.LogDebug("Complete message - Path: {Path}, SubscriptionName: {SubscriptionName}, SequenceNumber: {SequenceNumber}, DeliveryCount: {DeliveryCount}, MessageId: {MessageId}", TopicSubscription.Path, TopicSubscription.SubscriptionName, message.SequenceNumber, message.DeliveryCount, message.MessageId);
await completeMessage(message, token).ConfigureAwait(false);
}

protected Task ProcessErrorAsyncInternal(Exception exception, ServiceBusErrorSource errorSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface IServiceBusConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
public abstract class ServiceBusConsumerErrorHandler<T> : ConsumerErrorHandler<T>, IServiceBusConsumerErrorHandler<T>;
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

public interface IKafkaConsumerErrorHandler<in T> : IConsumerErrorHandler<T>;

public abstract class KafkaConsumerErrorHandler<T> : ConsumerErrorHandler<T>;
public abstract class KafkaConsumerErrorHandler<T> : ConsumerErrorHandler<T>, IKafkaConsumerErrorHandler<T>;
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ public async Task OnMessage(ConsumeResult message)
}

var r = await _messageProcessor.ProcessMessage(message, messageHeaders, cancellationToken: _cancellationTokenSource.Token).ConfigureAwait(false);
if (r.Result == ProcessResult.Abandon)
{
throw new NotSupportedException("Transport does not support abandoning messages");
}

if (r.Exception != null)
{
// The IKafkaConsumerErrorHandler and OnMessageFaulted was called at this point by the MessageProcessor.
Expand Down
Loading

0 comments on commit 15834b6

Please sign in to comment.