diff --git a/build.gradle b/build.gradle index b90d55d1..f994ca54 100644 --- a/build.gradle +++ b/build.gradle @@ -3,7 +3,7 @@ buildscript { mavenCentral() } dependencies { - classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.18' + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.19' classpath 'com.avast.gradle:gradle-docker-compose-plugin:0.15.2' classpath 'com.github.ben-manes:gradle-versions-plugin:0.39.0' } @@ -40,7 +40,7 @@ allprojects { catsEffectVersion = "2.5.4" fs2Version = "2.5.3" metricsVersion = "2.9.3" - protobufVersion = "3.20.1" + protobufVersion = "3.21.4" pureconfigVersion = "0.17.1" scalapbVersion = "0.11.8" scalapbJson4sVersion = "0.11.1" diff --git a/core/build.gradle b/core/build.gradle index 27a8140f..fb89a97b 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -22,7 +22,7 @@ dependencies { api "com.avast.bytes:bytes-core:${bytesVersion}" - api 'com.rabbitmq:amqp-client:5.14.2' + api 'com.rabbitmq:amqp-client:5.15.0' api "org.typelevel:cats-core_$scalaVersion:$catsVersion" api "org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion" diff --git a/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala b/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala index 0582c00e..f5442bb4 100644 --- a/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala +++ b/core/src/main/scala/com/avast/clients/rabbitmq/PoisonedMessageHandler.scala @@ -4,12 +4,13 @@ import cats.Applicative import cats.effect.{Resource, Sync} import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps} import com.avast.bytes.Bytes -import com.avast.clients.rabbitmq.PoisonedMessageHandler.defaultHandlePoisonedMessage +import com.avast.clients.rabbitmq.PoisonedMessageHandler.{defaultHandlePoisonedMessage, DiscardedTimeHeaderName} import com.avast.clients.rabbitmq.api.DeliveryResult.{Reject, Republish} import com.avast.clients.rabbitmq.api._ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger import com.avast.metrics.scalaeffectapi.Monitor +import java.time.Instant import scala.util.Try import scala.util.control.NonFatal @@ -78,7 +79,11 @@ object DeadQueuePoisonedMessageHandler { case None => CorrelationIdStrategy.RandomNew } - producer.send(dqpc.routingKey, rawBody, Some(d.properties))(cidStrategy) + val now = Instant.now() + + val finalProperties = d.properties.copy(headers = d.properties.headers.updated(DiscardedTimeHeaderName, now.toString)) + + producer.send(dqpc.routingKey, rawBody, Some(finalProperties))(cidStrategy) }) } } @@ -86,6 +91,7 @@ object DeadQueuePoisonedMessageHandler { object PoisonedMessageHandler { final val RepublishCountHeaderName: String = "X-Republish-Count" + final val DiscardedTimeHeaderName: String = "X-Discarded-Time" private[rabbitmq] def make[F[_]: Sync, A](config: Option[PoisonedMessageHandlingConfig], connection: RabbitMQConnection[F], @@ -140,7 +146,23 @@ object PoisonedMessageHandler { Applicative[F].pure( Republish(countAsPoisoned = true, newHeaders = newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef]))) } else { - handlePoisonedMessage(delivery, maxAttempts) + val now = Instant.now() + + def updateProperties(properties: MessageProperties): MessageProperties = { + properties.copy( + headers = properties.headers + .updated(DiscardedTimeHeaderName, now.toString) + .updated(RepublishCountHeaderName, maxAttempts.asInstanceOf[AnyRef])) + } + + val finalDelivery = delivery match { + case Delivery.Ok(body, properties, routingKey) => + Delivery.Ok(body, updateProperties(properties), routingKey) + case Delivery.MalformedContent(body, properties, routingKey, ce) => + Delivery.MalformedContent(body, updateProperties(properties), routingKey, ce) + } + + handlePoisonedMessage(finalDelivery, maxAttempts) .recoverWith { case NonFatal(e) => logger.warn(e)("Custom poisoned message handler failed") diff --git a/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala b/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala index d9c324c0..67dd3ec8 100644 --- a/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala +++ b/core/src/test/scala/com/avast/clients/rabbitmq/PoisonedMessageHandlerTest.scala @@ -8,6 +8,7 @@ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger import monix.eval.Task import monix.execution.Scheduler.Implicits.global +import java.time.Instant import java.util.concurrent.atomic.AtomicInteger class PoisonedMessageHandlerTest extends TestBase { @@ -111,6 +112,34 @@ class PoisonedMessageHandlerTest extends TestBase { assertResult(1)(movedCount.get()) } + test("DeadQueuePoisonedMessageHandler adds discarded time") { + def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { + Task.now(Republish()) + } + + val movedCount = new AtomicInteger(0) + + val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](3)({ (d, _, _) => + // test it's there and it can be parsed + assert(Instant.parse(d.properties.headers(DiscardedTimeHeaderName).asInstanceOf[String]).toEpochMilli > 0) + + Task.delay(movedCount.incrementAndGet()) + }) + + val properties = (1 to 2).foldLeft(MessageProperties.empty) { + case (p, _) => + run(handler, readAction, p) match { + case Republish(_, h) => MessageProperties(headers = h) + case _ => MessageProperties.empty + } + } + + assertResult(DeliveryResult.Reject)(run(handler, readAction, properties)) + + // if the assert above has failed, this won't assert + assertResult(1)(movedCount.get()) + } + test("pretend lower no. of attempts") { def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = { Task.now(Republish()) diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 92f06b50..8fad3f5a 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists