From f331ddf5312f31a07d38e5bff3ca1ed31238d41d Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 26 Dec 2024 15:57:18 -0800 Subject: [PATCH] Fix cancellation of RPC methods Fixes #1750 * Start by adding a test that reproduces the error. Give a 5ms cancellation to `BasicConsumeAsync`, with a much longer delay via Toxiproxy. If running in debug mode, you will see the same `task canceled` exception, but it does not propagate to the test itself. --- projects/Test/Integration/TestToxiproxy.cs | 42 ++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 507ac12e5..73fdfa4d1 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -36,6 +36,7 @@ using System.Threading.Tasks; using Integration; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using Toxiproxy.Net.Toxics; using Xunit; @@ -409,6 +410,47 @@ public async Task TestPublisherConfirmationThrottling() Assert.Equal(TotalMessageCount, publishCount); } + [SkippableFact] + [Trait("Category", "Toxiproxy")] + public async Task TestBasicConsumeCancellation_GH1750() + { + Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); + Assert.NotNull(_toxiproxyManager); + Assert.Null(_conn); + Assert.Null(_channel); + + ConnectionFactory cf = CreateConnectionFactory(); + cf.AutomaticRecoveryEnabled = false; + cf.TopologyRecoveryEnabled = false; + + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); + _conn = await cf.CreateConnectionAsync(); + _channel = await _conn.CreateChannelAsync(); + + QueueDeclareOk q = await _channel.QueueDeclareAsync(); + + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.ReceivedAsync += (o, a) => + { + return Task.CompletedTask; + }; + + string toxicName = $"rmq-localhost-delay-{Now}-{GenerateShortUuid()}"; + var latencyToxic = new LatencyToxic + { + Name = toxicName + }; + latencyToxic.Attributes.Latency = 500; + latencyToxic.Toxicity = 1.0; + latencyToxic.Stream = ToxicDirection.DownStream; + + Task addToxicTask = _toxiproxyManager.AddToxicAsync(latencyToxic); + await addToxicTask.WaitAsync(WaitSpan); + + using var cts = new CancellationTokenSource(5); + await _channel.BasicConsumeAsync(q.QueueName, true, consumer, cts.Token); + } + private bool AreToxiproxyTestsEnabled { get