Skip to content

Commit

Permalink
Merge pull request #188 from avast/TimestampIntoPoisoned
Browse files Browse the repository at this point in the history
Adding timestamp to messages moved to the dead queue
  • Loading branch information
jendakol authored Aug 8, 2022
2 parents a8efc9f + ff321fb commit b88bfe4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 7 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ buildscript {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.18'
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.19'
classpath 'com.avast.gradle:gradle-docker-compose-plugin:0.15.2'
classpath 'com.github.ben-manes:gradle-versions-plugin:0.39.0'
}
Expand Down Expand Up @@ -40,7 +40,7 @@ allprojects {
catsEffectVersion = "2.5.4"
fs2Version = "2.5.3"
metricsVersion = "2.9.3"
protobufVersion = "3.20.1"
protobufVersion = "3.21.4"
pureconfigVersion = "0.17.1"
scalapbVersion = "0.11.8"
scalapbJson4sVersion = "0.11.1"
Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ dependencies {

api "com.avast.bytes:bytes-core:${bytesVersion}"

api 'com.rabbitmq:amqp-client:5.14.2'
api 'com.rabbitmq:amqp-client:5.15.0'

api "org.typelevel:cats-core_$scalaVersion:$catsVersion"
api "org.typelevel:cats-effect_$scalaVersion:$catsEffectVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import cats.Applicative
import cats.effect.{Resource, Sync}
import cats.implicits.{catsSyntaxApplicativeError, catsSyntaxFlatMapOps, toFunctorOps}
import com.avast.bytes.Bytes
import com.avast.clients.rabbitmq.PoisonedMessageHandler.defaultHandlePoisonedMessage
import com.avast.clients.rabbitmq.PoisonedMessageHandler.{defaultHandlePoisonedMessage, DiscardedTimeHeaderName}
import com.avast.clients.rabbitmq.api.DeliveryResult.{Reject, Republish}
import com.avast.clients.rabbitmq.api._
import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import com.avast.metrics.scalaeffectapi.Monitor

import java.time.Instant
import scala.util.Try
import scala.util.control.NonFatal

Expand Down Expand Up @@ -78,14 +79,19 @@ object DeadQueuePoisonedMessageHandler {
case None => CorrelationIdStrategy.RandomNew
}

producer.send(dqpc.routingKey, rawBody, Some(d.properties))(cidStrategy)
val now = Instant.now()

val finalProperties = d.properties.copy(headers = d.properties.headers.updated(DiscardedTimeHeaderName, now.toString))

producer.send(dqpc.routingKey, rawBody, Some(finalProperties))(cidStrategy)
})
}
}
}

object PoisonedMessageHandler {
final val RepublishCountHeaderName: String = "X-Republish-Count"
final val DiscardedTimeHeaderName: String = "X-Discarded-Time"

private[rabbitmq] def make[F[_]: Sync, A](config: Option[PoisonedMessageHandlingConfig],
connection: RabbitMQConnection[F],
Expand Down Expand Up @@ -140,7 +146,23 @@ object PoisonedMessageHandler {
Applicative[F].pure(
Republish(countAsPoisoned = true, newHeaders = newHeaders + (RepublishCountHeaderName -> attempt.asInstanceOf[AnyRef])))
} else {
handlePoisonedMessage(delivery, maxAttempts)
val now = Instant.now()

def updateProperties(properties: MessageProperties): MessageProperties = {
properties.copy(
headers = properties.headers
.updated(DiscardedTimeHeaderName, now.toString)
.updated(RepublishCountHeaderName, maxAttempts.asInstanceOf[AnyRef]))
}

val finalDelivery = delivery match {
case Delivery.Ok(body, properties, routingKey) =>
Delivery.Ok(body, updateProperties(properties), routingKey)
case Delivery.MalformedContent(body, properties, routingKey, ce) =>
Delivery.MalformedContent(body, updateProperties(properties), routingKey, ce)
}

handlePoisonedMessage(finalDelivery, maxAttempts)
.recoverWith {
case NonFatal(e) =>
logger.warn(e)("Custom poisoned message handler failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.avast.clients.rabbitmq.logging.ImplicitContextLogger
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global

import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger

class PoisonedMessageHandlerTest extends TestBase {
Expand Down Expand Up @@ -111,6 +112,34 @@ class PoisonedMessageHandlerTest extends TestBase {
assertResult(1)(movedCount.get())
}

test("DeadQueuePoisonedMessageHandler adds discarded time") {
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
Task.now(Republish())
}

val movedCount = new AtomicInteger(0)

val handler = new DeadQueuePoisonedMessageHandler[Task, Bytes](3)({ (d, _, _) =>
// test it's there and it can be parsed
assert(Instant.parse(d.properties.headers(DiscardedTimeHeaderName).asInstanceOf[String]).toEpochMilli > 0)

Task.delay(movedCount.incrementAndGet())
})

val properties = (1 to 2).foldLeft(MessageProperties.empty) {
case (p, _) =>
run(handler, readAction, p) match {
case Republish(_, h) => MessageProperties(headers = h)
case _ => MessageProperties.empty
}
}

assertResult(DeliveryResult.Reject)(run(handler, readAction, properties))

// if the assert above has failed, this won't assert
assertResult(1)(movedCount.get())
}

test("pretend lower no. of attempts") {
def readAction(d: Delivery[Bytes]): Task[DeliveryResult] = {
Task.now(Republish())
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

0 comments on commit b88bfe4

Please sign in to comment.