Skip to content

Commit

Permalink
Add incomplete metric + integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 3, 2024
1 parent e6df527 commit 73373de
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ object Enrich {
sinkBad(allBad, env.sinkBad, env.metrics.badCount),
if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big")
else Sync[F].unit,
sinkIncomplete(incompleteBytes, env.sinkIncomplete)
sinkIncomplete(incompleteBytes, env.sinkIncomplete, env.metrics.incompleteCount)
).parSequence_
}

Expand Down Expand Up @@ -292,10 +292,11 @@ object Enrich {

def sinkIncomplete[F[_]: Sync](
incomplete: List[AttributedData[Array[Byte]]],
maybeSink: Option[AttributedByteSink[F]]
maybeSink: Option[AttributedByteSink[F]],
incMetrics: Int => F[Unit]
): F[Unit] =
maybeSink match {
case Some(sink) => sink(incomplete)
case Some(sink) => sink(incomplete) *> incMetrics(incomplete.size)
case None => Sync[F].unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ object Environment {
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled))
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled, incomplete.isDefined))
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http4s))
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
remoteAdapters <- prepareRemoteAdapters[F](file.remoteAdapters, metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ trait Metrics[F[_]] {
/** Increment bad events */
def badCount(nb: Int): F[Unit]

/** Increment incomplete events */
def incompleteCount(nb: Int): F[Unit]

/** Increment invalid enriched events count */
def invalidCount: F[Unit]

Expand All @@ -63,6 +66,7 @@ object Metrics {
val RawCounterName = "raw"
val GoodCounterName = "good"
val BadCounterName = "bad"
val IncompleteCounterName = "incomplete"
val InvalidCounterName = "invalid_enriched"
val RemoteAdaptersSuccessCounterName = "remote_adapters_success"
val RemoteAdaptersFailureCounterName = "remote_adapters_failure"
Expand All @@ -73,6 +77,7 @@ object Metrics {
rawCount: Int,
goodCount: Int,
badCount: Int,
incompleteCount: Option[Int],
invalidCount: Int,
remoteAdaptersSuccessCount: Option[Int],
remoteAdaptersFailureCount: Option[Int],
Expand All @@ -85,33 +90,35 @@ object Metrics {

def build[F[_]: Async](
config: MetricsReporters,
remoteAdaptersEnabled: Boolean
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): F[Metrics[F]] =
config match {
case MetricsReporters(None, None, _) => noop[F].pure[F]
case MetricsReporters(statsd, stdout, _) => impl[F](statsd, stdout, remoteAdaptersEnabled)
case MetricsReporters(statsd, stdout, _) => impl[F](statsd, stdout, remoteAdaptersEnabled, incompleteEventsEnabled)
}

private def impl[F[_]: Async](
statsd: Option[MetricsReporters.StatsD],
stdout: Option[MetricsReporters.Stdout],
remoteAdaptersEnabled: Boolean
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): F[Metrics[F]] =
for {
refsStatsd <- MetricRefs.init[F](remoteAdaptersEnabled)
refsStdout <- MetricRefs.init[F](remoteAdaptersEnabled)
refsStatsd <- MetricRefs.init[F](remoteAdaptersEnabled, incompleteEventsEnabled)
refsStdout <- MetricRefs.init[F](remoteAdaptersEnabled, incompleteEventsEnabled)
} yield new Metrics[F] {
def report: Stream[F, Unit] = {

val rep1 = statsd
.map { config =>
reporterStream(StatsDReporter.make[F](config), refsStatsd, config.period, remoteAdaptersEnabled)
reporterStream(StatsDReporter.make[F](config), refsStatsd, config.period, remoteAdaptersEnabled, incompleteEventsEnabled)
}
.getOrElse(Stream.never[F])

val rep2 = stdout
.map { config =>
reporterStream(Resource.eval(stdoutReporter(config)), refsStdout, config.period, remoteAdaptersEnabled)
reporterStream(Resource.eval(stdoutReporter(config)), refsStdout, config.period, remoteAdaptersEnabled, incompleteEventsEnabled)
}
.getOrElse(Stream.never[F])

Expand Down Expand Up @@ -142,6 +149,10 @@ object Metrics {
refsStatsd.badCount.update(_ + nb) *>
refsStdout.badCount.update(_ + nb)

def incompleteCount(nb: Int): F[Unit] =
refsStatsd.incompleteCount.update(_.map(_ + nb)) *>
refsStdout.incompleteCount.update(_.map(_ + nb))

def invalidCount: F[Unit] =
refsStatsd.invalidCount.update(_ + 1) *>
refsStdout.invalidCount.update(_ + 1)
Expand All @@ -161,19 +172,21 @@ object Metrics {
rawCount: Ref[F, Int],
goodCount: Ref[F, Int],
badCount: Ref[F, Int],
incompleteCount: Ref[F, Option[Int]],
invalidCount: Ref[F, Int],
remoteAdaptersSuccessCount: Ref[F, Option[Int]],
remoteAdaptersFailureCount: Ref[F, Option[Int]],
remoteAdaptersTimeoutCount: Ref[F, Option[Int]]
)

private object MetricRefs {
def init[F[_]: Sync](remoteAdaptersEnabled: Boolean): F[MetricRefs[F]] =
def init[F[_]: Sync](remoteAdaptersEnabled: Boolean, incompleteEventsEnabled: Boolean): F[MetricRefs[F]] =
for {
latency <- Ref.of[F, Option[Long]](None)
rawCounter <- Ref.of[F, Int](0)
goodCounter <- Ref.of[F, Int](0)
badCounter <- Ref.of[F, Int](0)
incompleteCounter <- Ref.of[F, Option[Int]](if (incompleteEventsEnabled) Some(0) else None)
invalidCounter <- Ref.of[F, Int](0)
remoteAdaptersSuccessCounter <- Ref.of[F, Option[Int]](if (remoteAdaptersEnabled) Some(0) else None)
remoteAdaptersFailureCounter <- Ref.of[F, Option[Int]](if (remoteAdaptersEnabled) Some(0) else None)
Expand All @@ -183,18 +196,24 @@ object Metrics {
rawCounter,
goodCounter,
badCounter,
incompleteCounter,
invalidCounter,
remoteAdaptersSuccessCounter,
remoteAdaptersFailureCounter,
remoteAdaptersTimeoutCounter
)

def snapshot[F[_]: Monad](refs: MetricRefs[F], remoteAdaptersEnabled: Boolean): F[MetricSnapshot] =
def snapshot[F[_]: Monad](
refs: MetricRefs[F],
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): F[MetricSnapshot] =
for {
latency <- refs.latency.getAndSet(None)
rawCount <- refs.rawCount.getAndSet(0)
goodCount <- refs.goodCount.getAndSet(0)
badCount <- refs.badCount.getAndSet(0)
incompleteCount <- refs.incompleteCount.getAndSet(if (incompleteEventsEnabled) Some(0) else None)
invalidCount <- refs.invalidCount.getAndSet(0)
remoteAdaptersSuccessCount <- refs.remoteAdaptersSuccessCount.getAndSet(if (remoteAdaptersEnabled) Some(0) else None)
remoteAdaptersFailureCount <- refs.remoteAdaptersFailureCount.getAndSet(if (remoteAdaptersEnabled) Some(0) else None)
Expand All @@ -203,6 +222,7 @@ object Metrics {
rawCount,
goodCount,
badCount,
incompleteCount,
invalidCount,
remoteAdaptersSuccessCount,
remoteAdaptersFailureCount,
Expand All @@ -214,12 +234,13 @@ object Metrics {
reporter: Resource[F, Reporter[F]],
metrics: MetricRefs[F],
period: FiniteDuration,
remoteAdaptersEnabled: Boolean
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): Stream[F, Unit] =
for {
rep <- Stream.resource(reporter)
_ <- Stream.fixedDelay[F](period)
snapshot <- Stream.eval(MetricRefs.snapshot(metrics, remoteAdaptersEnabled))
snapshot <- Stream.eval(MetricRefs.snapshot(metrics, remoteAdaptersEnabled, incompleteEventsEnabled))
_ <- Stream.eval(rep.report(snapshot))
} yield ()

Expand All @@ -234,6 +255,9 @@ object Metrics {
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, RawCounterName)} = ${snapshot.rawCount}")
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, GoodCounterName)} = ${snapshot.goodCount}")
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, BadCounterName)} = ${snapshot.badCount}")
_ <- snapshot.incompleteCount
.map(cnt => logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, IncompleteCounterName)} = $cnt"))
.getOrElse(Applicative[F].unit)
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, InvalidCounterName)} = ${snapshot.invalidCount}")
_ <- snapshot.enrichLatency
.map(latency => logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, LatencyGaugeName)} = $latency"))
Expand All @@ -257,6 +281,7 @@ object Metrics {
def rawCount(nb: Int): F[Unit] = Applicative[F].unit
def goodCount(nb: Int): F[Unit] = Applicative[F].unit
def badCount(nb: Int): F[Unit] = Applicative[F].unit
def incompleteCount(nb: Int): F[Unit] = Applicative[F].unit
def invalidCount: F[Unit] = Applicative[F].unit
def remoteAdaptersSuccessCount: F[Unit] = Applicative[F].unit
def remoteAdaptersFailureCount: F[Unit] = Applicative[F].unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ object StatsDReporter {
Metrics.BadCounterName -> snapshot.badCount.toString,
Metrics.InvalidCounterName -> snapshot.invalidCount.toString
) ++ snapshot.enrichLatency.map(l => Metrics.LatencyGaugeName -> l.toString) ++
snapshot.incompleteCount.map(cnt => Metrics.IncompleteCounterName -> cnt.toString) ++
snapshot.remoteAdaptersSuccessCount.map(cnt => Metrics.RemoteAdaptersSuccessCounterName -> cnt.toString) ++
snapshot.remoteAdaptersFailureCount.map(cnt => Metrics.RemoteAdaptersFailureCounterName -> cnt.toString) ++
snapshot.remoteAdaptersTimeoutCount.map(cnt => Metrics.RemoteAdaptersTimeoutCounterName -> cnt.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Processor, Payload => BadRowPayload}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.IpLookupsEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{IpLookupsEnrichment, JavascriptScriptEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, MiscEnrichments}
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
Expand Down Expand Up @@ -187,21 +187,55 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck {
}

"enrich" should {
"update metrics with raw, good and bad counters" in {
val input = Stream.emits(List(Array.empty[Byte], EnrichSpec.payload))
TestEnvironment.make(input).use { test =>
"update metrics with raw, good, bad and incomplete counters" in {
val script = """
function process(event, params) {
if(event.getUser_ipaddress() == "foo") {
throw "BOOM";
}
return [ ];
}"""
val config = json"""{
"parameters": {
"script": ${ConversionUtils.encodeBase64Url(script)}
}
}"""
val schemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"javascript_script_config",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
val jsEnrichConf =
JavascriptScriptEnrichment.parse(config, schemaKey).toOption.get

val context = EnrichSpec.context.copy(ipAddress = Some("foo"))
val payload = EnrichSpec.collectorPayload.copy(context = context)

val input = Stream.emits(
List(
Array.empty[Byte],
EnrichSpec.payload,
payload.toRaw
)
)

TestEnvironment.make(input, List(jsEnrichConf)).use { test =>
val enrichStream = Enrich.run[IO, Array[Byte]](test.env)
for {
_ <- enrichStream.compile.drain
bad <- test.bad
good <- test.good
incomplete <- test.incomplete
counter <- test.counter.get
} yield {
(counter.raw must_== 2L)
(counter.raw must_== 3L)
(counter.good must_== 1L)
(counter.bad must_== 1L)
(bad.size must_== 1)
(counter.bad must_== 2L)
(counter.incomplete must_== 1L)
(bad.size must_== 2)
(good.size must_== 1)
(incomplete.size must_== 1)
}
}
}
Expand Down Expand Up @@ -249,9 +283,10 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck {
test
.run(_.copy(assetsUpdatePeriod = Some(1800.millis)))
.map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
(good must contain(exactly(one, two)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {
val testWithHttp = HttpServer.resource *> TestEnvironment.make(input, List(enrichment))
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
bad must beEmpty
pii must beEmpty
incomplete must beEmpty
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
Expand Down Expand Up @@ -117,9 +118,10 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
good must beEmpty
pii must beEmpty
incomplete must haveSize(nbEvents)
bad.collect { case ef: BadRow.EnrichmentFailures => ef } must haveSize(nbEvents)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ class IabEnrichmentSpec extends Specification with CatsEffect {
val testWithHttp = HttpServer.resource *> TestEnvironment.make(input, List(IabEnrichmentSpec.enrichmentConf))
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
Expand Down Expand Up @@ -95,9 +96,10 @@ class IabEnrichmentSpec extends Specification with CatsEffect {
val testWithHttp = HttpServer.resource *> TestEnvironment.make(input, List(IabEnrichmentSpec.enrichmentConf))
testWithHttp.use { test =>
test.run(_.copy(assetsUpdatePeriod = Some(1800.millis))).map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
good.map(_.derived_contexts) must contain(exactly(expectedOne, expectedTwo))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ class YauaaEnrichmentSpec extends Specification with CatsEffect {

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class StatsDReporterSpec extends Specification {

"StatsDeporter" should {
"serialize metrics" in {
val snapshot = Metrics.MetricSnapshot(Some(10000L), 10, 20, 30, 0, Some(40), Some(0), Some(0))
val snapshot = Metrics.MetricSnapshot(Some(10000L), 10, 20, 30, Some(25), 0, Some(40), Some(0), Some(0))

val result = StatsDReporter.serializedMetrics(snapshot, TestConfig)

Expand All @@ -30,6 +30,7 @@ class StatsDReporterSpec extends Specification {
"snowplow.test.raw:10|c|#tag1:abc",
"snowplow.test.good:20|c|#tag1:abc",
"snowplow.test.bad:30|c|#tag1:abc",
"snowplow.test.incomplete:25|c|#tag1:abc",
"snowplow.test.latency:10000|g|#tag1:abc",
"snowplow.test.invalid_enriched:0|c|#tag1:abc",
"snowplow.test.remote_adapters_success:40|c|#tag1:abc",
Expand All @@ -40,7 +41,7 @@ class StatsDReporterSpec extends Specification {
}

"serialize metrics when latency is empty" in {
val snapshot = Metrics.MetricSnapshot(None, 10, 20, 30, 40, Some(40), Some(0), Some(0))
val snapshot = Metrics.MetricSnapshot(None, 10, 20, 30, None, 40, Some(40), Some(0), Some(0))

val result = StatsDReporter.serializedMetrics(snapshot, TestConfig)

Expand All @@ -58,7 +59,7 @@ class StatsDReporterSpec extends Specification {
}

"serialize metrics when remote adapter metrics are empty" in {
val snapshot = Metrics.MetricSnapshot(Some(10000L), 10, 20, 30, 40, None, None, None)
val snapshot = Metrics.MetricSnapshot(Some(10000L), 10, 20, 30, Some(25), 40, None, None, None)

val result = StatsDReporter.serializedMetrics(snapshot, TestConfig)

Expand All @@ -67,6 +68,7 @@ class StatsDReporterSpec extends Specification {
"snowplow.test.raw:10|c|#tag1:abc",
"snowplow.test.good:20|c|#tag1:abc",
"snowplow.test.bad:30|c|#tag1:abc",
"snowplow.test.incomplete:25|c|#tag1:abc",
"snowplow.test.latency:10000|g|#tag1:abc",
"snowplow.test.invalid_enriched:40|c|#tag1:abc"
)
Expand Down
Loading

0 comments on commit 73373de

Please sign in to comment.