Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host] Consumer Circuit breaker #353

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ SlimMessageBus is a client façade for message brokers for .NET. It comes with i
| `.Host.Outbox.Sql` | Transactional Outbox using MSSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) |
| `.Host.Outbox.Sql.DbContext` | Transactional Outbox using MSSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) |
| `.Host.AsyncApi` | [AsyncAPI](https://www.asyncapi.com/) specification generation via [Saunter](https://github.com/tehmantra/saunter) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AsyncApi.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AsyncApi) |
| `.Host.CircuitBreaker.HealthCheck` | Consumer circuit breaker based on [health checks](docs/intro.md#health-check-circuit-breaker) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.CircuitBreaker.HealthCheck.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.CircuitBreaker.HealthCheck) |

Typically the application layers (domain model, business logic) only need to depend on `SlimMessageBus` which is the facade, and ultimately the application hosting layer (ASP.NET, Console App, Windows Service) will reference and configure the other packages (`SlimMessageBus.Host.*`) which are the messaging transport providers and additional plugins.

Expand Down
3 changes: 3 additions & 0 deletions build/tasks.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ $projects = @(
"SlimMessageBus.Host.Outbox.Sql",
"SlimMessageBus.Host.Outbox.Sql.DbContext",

"SlimMessageBus.Host.CircuitBreaker",
"SlimMessageBus.Host.CircuitBreaker.HealthCheck",

"SlimMessageBus.Host.AsyncApi"
)

Expand Down
1 change: 1 addition & 0 deletions docs/NuGet.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ Plugins:
- Transactional Outbox pattern (SQL, DbContext)
- Serialization using JSON, Avro, ProtoBuf
- AsyncAPI specification generation
- Consumer Circuit Breaker based on Health Checks

Find out more [https://github.com/zarusz/SlimMessageBus](https://github.com/zarusz/SlimMessageBus).
27 changes: 27 additions & 0 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Health check circuit breaker](#health-check-circuit-breaker)
- [Consumer context (additional message information)](#consumer-context-additional-message-information)
- [Per-message DI container scope](#per-message-di-container-scope)
- [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse)
Expand Down Expand Up @@ -291,6 +292,32 @@ await consumerControl.Stop();

> Since version 1.15.5

#### Health check circuit breaker

Consumers can be linked to [.NET app health checks](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/diagnostic-health-checks) [tags](https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks#register-health-check-services), enabling or disabling the consumer based on the health check status reported by the [Health Check Publisher](https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks#health-check-publisher). A consumer associated with one or more tags will only be active if all health checks linked to the tags are passing.

```cs
// add health checks with tags
builder.Services
.AddHealthChecks()
.AddCheck<StorageHealthCheck>("Storage", tags: ["Storage"]);
.AddCheck<SqlServerHealthCheck>("SqlServer", tags: ["Sql"]);

builder.Services
.AddSlimMessageBus(mbb => {
...

mbb.Consume<Message>(cfg => {
...

// configure consumer to monitor tag/state
cfg.PauseOnUnhealthyCheck("Storage");
cfg.PauseOnDegradedHealthCheck("Sql");
})
})
```
*Requires: SlimMessageBus.Host.CircuitBreaker.HealthCheck*

#### Consumer context (additional message information)

The consumer can access the [`IConsumerContext`](../src/SlimMessageBus/IConsumerContext.cs) object which:
Expand Down
27 changes: 27 additions & 0 deletions docs/intro.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [Set message headers](#set-message-headers)
- [Consumer](#consumer)
- [Start or Stop message consumption](#start-or-stop-message-consumption)
- [Health check circuit breaker](#health-check-circuit-breaker)
- [Consumer context (additional message information)](#consumer-context-additional-message-information)
- [Per-message DI container scope](#per-message-di-container-scope)
- [Hybrid bus and message scope reuse](#hybrid-bus-and-message-scope-reuse)
Expand Down Expand Up @@ -291,6 +292,32 @@ await consumerControl.Stop();

> Since version 1.15.5

#### Health check circuit breaker

Consumers can be linked to [.NET app health checks](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/diagnostic-health-checks) [tags](https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks#register-health-check-services), enabling or disabling the consumer based on the health check status reported by the [Health Check Publisher](https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks#health-check-publisher). A consumer associated with one or more tags will only be active if all health checks linked to the tags are passing.

```cs
// add health checks with tags
builder.Services
.AddHealthChecks()
.AddCheck<StorageHealthCheck>("Storage", tags: ["Storage"]);
.AddCheck<SqlServerHealthCheck>("SqlServer", tags: ["Sql"]);

builder.Services
.AddSlimMessageBus(mbb => {
...

mbb.Consume<Message>(cfg => {
...

// configure consumer to monitor tag/state
cfg.PauseOnUnhealthyCheck("Storage");
cfg.PauseOnDegradedHealthCheck("Sql");
})
})
```
*Requires: SlimMessageBus.Host.CircuitBreaker.HealthCheck*

#### Consumer context (additional message information)

The consumer can access the [`IConsumerContext`](../src/SlimMessageBus/IConsumerContext.cs) object which:
Expand Down
3 changes: 3 additions & 0 deletions src/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,14 @@ dotnet_style_allow_multiple_blank_lines_experimental = true:silent
dotnet_style_allow_statement_immediately_after_block_experimental = true:silent
dotnet_style_prefer_collection_expression = when_types_loosely_match:suggestion
dotnet_diagnostic.CA1859.severity = silent

dotnet_style_qualification_for_field = false:suggestion
dotnet_style_qualification_for_property = false:suggestion
dotnet_style_qualification_for_method = false:suggestion
dotnet_style_qualification_for_event = false:suggestion
dotnet_diagnostic.VSTHRD200.severity = none
# not supported by .netstandard2.0
dotnet_diagnostic.CA1510.severity = none

[*.{csproj,xml}]
indent_style = space
Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>3.0.0-rc901</Version>
<Version>3.0.0-rc902</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class AddConsumer(ILogger<AddConsumer> logger) : IConsumer<Add>
{
public Task OnHandle(Add message, CancellationToken cancellationToken)
{
logger.LogInformation("{A} + {B} = {C}", message.A, message.B, message.A + message.B);
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Sample.CircuitBreaker.HealthCheck.Consumers;

public class SubtractConsumer(ILogger<SubtractConsumer> logger) : IConsumer<Subtract>
{
public Task OnHandle(Subtract message, CancellationToken cancellationToken)
{
logger.LogInformation("{A} - {B} = {C}", message.A, message.B, message.A - message.B);
return Task.CompletedTask;
}
}
17 changes: 17 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
global using System.Net.Mime;
global using System.Reflection;

global using Microsoft.Extensions.Configuration;
global using Microsoft.Extensions.DependencyInjection;
global using Microsoft.Extensions.Hosting;
global using Microsoft.Extensions.Logging;

global using Sample.CircuitBreaker.HealthCheck.Consumers;
global using Sample.CircuitBreaker.HealthCheck.Models;

global using SecretStore;

global using SlimMessageBus;
global using SlimMessageBus.Host;
global using SlimMessageBus.Host.RabbitMQ;
global using SlimMessageBus.Host.Serialization.SystemTextJson;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

public class AddRandomHealthCheck(ILogger<AddRandomHealthCheck> logger) : RandomHealthCheck(logger)
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

using Microsoft.Extensions.Diagnostics.HealthChecks;

public abstract class RandomHealthCheck(ILogger logger) : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
var value = (HealthStatus)Random.Shared.Next(3);
logger.LogInformation("{HealthCheck} evaluated as {HealthStatus}", GetType(), value);
return Task.FromResult(new HealthCheckResult(value, value.ToString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace Sample.CircuitBreaker.HealthCheck.HealthChecks;

public class SubtractRandomHealthCheck(ILogger<SubtractRandomHealthCheck> logger) : RandomHealthCheck(logger)
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace Sample.CircuitBreaker.HealthCheck;

public class IntermittentMessagePublisher(ILogger<IntermittentMessagePublisher> logger, IMessageBus messageBus)
: BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var a = Random.Shared.Next(10);
var b = Random.Shared.Next(10);

logger.LogInformation("Emitting {A} +- {B} = ?", a, b);

await Task.WhenAll(
messageBus.Publish(new Add(a, b), cancellationToken: stoppingToken),
messageBus.Publish(new Subtract(a, b), cancellationToken: stoppingToken),
Task.Delay(1000, stoppingToken));
}
}
}
3 changes: 3 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/Models/Add.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Add(int A, int B);
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Sample.CircuitBreaker.HealthCheck.Models;

public record Subtract(int A, int B);
92 changes: 92 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
namespace Sample.CircuitBreaker.HealthCheck;

using Microsoft.Extensions.Diagnostics.HealthChecks;

using Sample.CircuitBreaker.HealthCheck.HealthChecks;

using SlimMessageBus.Host.CircuitBreaker.HealthCheck;

public static class Program
{
private static async Task Main(string[] args)
{
// Local file with secrets
Secrets.Load(@"..\..\..\..\..\secrets.txt");

await Host.CreateDefaultBuilder(args)
.ConfigureServices((builder, services) =>
{
const string AddTag = "add";
const string SubtractTag = "subtract";

services.AddSlimMessageBus(mbb =>
{
var ticks = DateTimeOffset.UtcNow.Ticks;
var addTopic = $"Sample-CircuitBreaker-HealthCheck-add-{ticks}";
var subtractTopic = $"Sample-CircuitBreaker-HealthCheck-subtract-{ticks}";

mbb
.WithProviderRabbitMQ(
cfg =>
{
cfg.ConnectionString = Secrets.Service.PopulateSecrets(builder.Configuration.GetValue<string>("RabbitMQ:ConnectionString"));
cfg.ConnectionFactory.ClientProvidedName = $"Sample_CircuitBreaker_HealthCheck_{Environment.MachineName}";

cfg.UseMessagePropertiesModifier((m, p) => p.ContentType = MediaTypeNames.Application.Json);
cfg.UseExchangeDefaults(durable: false);
cfg.UseQueueDefaults(durable: false);
});
mbb
.Produce<Add>(x => x
.Exchange(addTopic, exchangeType: ExchangeType.Fanout, autoDelete: false)
.RoutingKeyProvider((m, p) => Guid.NewGuid().ToString()))
.Consume<Add>(
cfg =>
{
cfg
.Queue(nameof(Add), autoDelete: false)
.Path(nameof(Add))
.ExchangeBinding(addTopic)
.WithConsumer<AddConsumer>()
.PauseOnDegradedHealthCheck(AddTag);
});

mbb
.Produce<Subtract>(x => x
.Exchange(subtractTopic, exchangeType: ExchangeType.Fanout, autoDelete: false)
.RoutingKeyProvider((m, p) => Guid.NewGuid().ToString()))
.Consume<Subtract>(
cfg =>
{
cfg
.Queue(nameof(Subtract), autoDelete: false)
.Path(nameof(Subtract))
.ExchangeBinding(subtractTopic)
.WithConsumer<SubtractConsumer>()
.PauseOnUnhealthyCheck(SubtractTag);
});

mbb.AddServicesFromAssembly(Assembly.GetExecutingAssembly());
mbb.AddJsonSerializer();
});

services.AddHostedService<IntermittentMessagePublisher>();
services.AddSingleton<AddRandomHealthCheck>();
services.AddSingleton<SubtractRandomHealthCheck>();

services.Configure<HealthCheckPublisherOptions>(cfg =>
{
// aggressive to toggle health status often (sample only)
cfg.Delay = TimeSpan.FromSeconds(3);
cfg.Period = TimeSpan.FromSeconds(5);
});

services
.AddHealthChecks()
.AddCheck<AddRandomHealthCheck>("Add", tags: [AddTag])
.AddCheck<SubtractRandomHealthCheck>("Subtract", tags: [SubtractTag]);
})
.Build()
.RunAsync();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" Version="8.0.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\SlimMessageBus.Host.CircuitBreaker.HealthCheck\SlimMessageBus.Host.CircuitBreaker.HealthCheck.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.RabbitMQ\SlimMessageBus.Host.RabbitMQ.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host\SlimMessageBus.Host.csproj" />
<ProjectReference Include="..\..\SlimMessageBus.Host.Serialization.SystemTextJson\SlimMessageBus.Host.Serialization.SystemTextJson.csproj" />
<ProjectReference Include="..\..\SlimMessageBus\SlimMessageBus.csproj" />
<ProjectReference Include="..\..\Tools\SecretStore\SecretStore.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
<ExcludeFromSingleFile>true</ExcludeFromSingleFile>
<CopyToPublishDirectory>PreserveNewest</CopyToPublishDirectory>
</None>
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions src/Samples/Sample.CircuitBreaker.HealthCheck/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
},
"Console": {
"FormatterName": "simple",
"FormatterOptions": {
"SingleLine": true,
"TimestampFormat": "HH:mm:ss.fff "
}
}
},
"RabbitMQ": {
"ConnectionString": "{{rabbitmq_connectionstring}}"
}
}
Loading
Loading