Skip to content

Commit

Permalink
Add incomplete metric + integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 9, 2024
1 parent 3fd3690 commit ebe793f
Show file tree
Hide file tree
Showing 20 changed files with 403 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
env:
PGPASSWORD: supersecret1
- name: Run tests
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" TESTCONTAINERS_RYUK_DISABLED=true sbt coverage +test
env:
OER_KEY: ${{ secrets.OER_KEY }}
- name: Check Scala formatting
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ lazy val kinesisDistroless = project
.settings(libraryDependencies ++= kinesisDependencies ++ Seq(
// integration tests dependencies
specs2CEIt,
testContainersIt
testContainersIt,
dockerJavaIt
))
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ object Enrich {
sinkBad(allBad, env.sinkBad, env.metrics.badCount),
if (incompleteTooBig.nonEmpty) Logger[F].warn(s"${incompleteTooBig.size} incomplete events discarded because they are too big")
else Sync[F].unit,
sinkIncomplete(incompleteBytes, env.sinkIncomplete)
sinkIncomplete(incompleteBytes, env.sinkIncomplete, env.metrics.incompleteCount)
).parSequence_
}

Expand Down Expand Up @@ -292,10 +292,11 @@ object Enrich {

def sinkIncomplete[F[_]: Sync](
incomplete: List[AttributedData[Array[Byte]]],
maybeSink: Option[AttributedByteSink[F]]
maybeSink: Option[AttributedByteSink[F]],
incMetrics: Int => F[Unit]
): F[Unit] =
maybeSink match {
case Some(sink) => sink(incomplete)
case Some(sink) => sink(incomplete) *> incMetrics(incomplete.size)
case None => Sync[F].unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ object Environment {
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled))
metrics <- Resource.eval(Metrics.build[F](file.monitoring.metrics, remoteAdaptersEnabled, incomplete.isDefined))
metadata <- Resource.eval(metadataReporter[F](file, processor.artifact, http4s))
assets = parsedConfigs.enrichmentConfigs.flatMap(_.filesToCache)
remoteAdapters <- prepareRemoteAdapters[F](file.remoteAdapters, metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ trait Metrics[F[_]] {
/** Increment bad events */
def badCount(nb: Int): F[Unit]

/** Increment incomplete events */
def incompleteCount(nb: Int): F[Unit]

/** Increment invalid enriched events count */
def invalidCount: F[Unit]

Expand All @@ -63,6 +66,7 @@ object Metrics {
val RawCounterName = "raw"
val GoodCounterName = "good"
val BadCounterName = "bad"
val IncompleteCounterName = "incomplete"
val InvalidCounterName = "invalid_enriched"
val RemoteAdaptersSuccessCounterName = "remote_adapters_success"
val RemoteAdaptersFailureCounterName = "remote_adapters_failure"
Expand All @@ -73,6 +77,7 @@ object Metrics {
rawCount: Int,
goodCount: Int,
badCount: Int,
incompleteCount: Option[Int],
invalidCount: Int,
remoteAdaptersSuccessCount: Option[Int],
remoteAdaptersFailureCount: Option[Int],
Expand All @@ -85,33 +90,35 @@ object Metrics {

def build[F[_]: Async](
config: MetricsReporters,
remoteAdaptersEnabled: Boolean
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): F[Metrics[F]] =
config match {
case MetricsReporters(None, None, _) => noop[F].pure[F]
case MetricsReporters(statsd, stdout, _) => impl[F](statsd, stdout, remoteAdaptersEnabled)
case MetricsReporters(statsd, stdout, _) => impl[F](statsd, stdout, remoteAdaptersEnabled, incompleteEventsEnabled)
}

private def impl[F[_]: Async](
statsd: Option[MetricsReporters.StatsD],
stdout: Option[MetricsReporters.Stdout],
remoteAdaptersEnabled: Boolean
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): F[Metrics[F]] =
for {
refsStatsd <- MetricRefs.init[F](remoteAdaptersEnabled)
refsStdout <- MetricRefs.init[F](remoteAdaptersEnabled)
refsStatsd <- MetricRefs.init[F](remoteAdaptersEnabled, incompleteEventsEnabled)
refsStdout <- MetricRefs.init[F](remoteAdaptersEnabled, incompleteEventsEnabled)
} yield new Metrics[F] {
def report: Stream[F, Unit] = {

val rep1 = statsd
.map { config =>
reporterStream(StatsDReporter.make[F](config), refsStatsd, config.period, remoteAdaptersEnabled)
reporterStream(StatsDReporter.make[F](config), refsStatsd, config.period, remoteAdaptersEnabled, incompleteEventsEnabled)
}
.getOrElse(Stream.never[F])

val rep2 = stdout
.map { config =>
reporterStream(Resource.eval(stdoutReporter(config)), refsStdout, config.period, remoteAdaptersEnabled)
reporterStream(Resource.eval(stdoutReporter(config)), refsStdout, config.period, remoteAdaptersEnabled, incompleteEventsEnabled)
}
.getOrElse(Stream.never[F])

Expand Down Expand Up @@ -142,6 +149,10 @@ object Metrics {
refsStatsd.badCount.update(_ + nb) *>
refsStdout.badCount.update(_ + nb)

def incompleteCount(nb: Int): F[Unit] =
refsStatsd.incompleteCount.update(_.map(_ + nb)) *>
refsStdout.incompleteCount.update(_.map(_ + nb))

def invalidCount: F[Unit] =
refsStatsd.invalidCount.update(_ + 1) *>
refsStdout.invalidCount.update(_ + 1)
Expand All @@ -161,19 +172,21 @@ object Metrics {
rawCount: Ref[F, Int],
goodCount: Ref[F, Int],
badCount: Ref[F, Int],
incompleteCount: Ref[F, Option[Int]],
invalidCount: Ref[F, Int],
remoteAdaptersSuccessCount: Ref[F, Option[Int]],
remoteAdaptersFailureCount: Ref[F, Option[Int]],
remoteAdaptersTimeoutCount: Ref[F, Option[Int]]
)

private object MetricRefs {
def init[F[_]: Sync](remoteAdaptersEnabled: Boolean): F[MetricRefs[F]] =
def init[F[_]: Sync](remoteAdaptersEnabled: Boolean, incompleteEventsEnabled: Boolean): F[MetricRefs[F]] =
for {
latency <- Ref.of[F, Option[Long]](None)
rawCounter <- Ref.of[F, Int](0)
goodCounter <- Ref.of[F, Int](0)
badCounter <- Ref.of[F, Int](0)
incompleteCounter <- Ref.of[F, Option[Int]](if (incompleteEventsEnabled) Some(0) else None)
invalidCounter <- Ref.of[F, Int](0)
remoteAdaptersSuccessCounter <- Ref.of[F, Option[Int]](if (remoteAdaptersEnabled) Some(0) else None)
remoteAdaptersFailureCounter <- Ref.of[F, Option[Int]](if (remoteAdaptersEnabled) Some(0) else None)
Expand All @@ -183,18 +196,24 @@ object Metrics {
rawCounter,
goodCounter,
badCounter,
incompleteCounter,
invalidCounter,
remoteAdaptersSuccessCounter,
remoteAdaptersFailureCounter,
remoteAdaptersTimeoutCounter
)

def snapshot[F[_]: Monad](refs: MetricRefs[F], remoteAdaptersEnabled: Boolean): F[MetricSnapshot] =
def snapshot[F[_]: Monad](
refs: MetricRefs[F],
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): F[MetricSnapshot] =
for {
latency <- refs.latency.getAndSet(None)
rawCount <- refs.rawCount.getAndSet(0)
goodCount <- refs.goodCount.getAndSet(0)
badCount <- refs.badCount.getAndSet(0)
incompleteCount <- refs.incompleteCount.getAndSet(if (incompleteEventsEnabled) Some(0) else None)
invalidCount <- refs.invalidCount.getAndSet(0)
remoteAdaptersSuccessCount <- refs.remoteAdaptersSuccessCount.getAndSet(if (remoteAdaptersEnabled) Some(0) else None)
remoteAdaptersFailureCount <- refs.remoteAdaptersFailureCount.getAndSet(if (remoteAdaptersEnabled) Some(0) else None)
Expand All @@ -203,6 +222,7 @@ object Metrics {
rawCount,
goodCount,
badCount,
incompleteCount,
invalidCount,
remoteAdaptersSuccessCount,
remoteAdaptersFailureCount,
Expand All @@ -214,12 +234,13 @@ object Metrics {
reporter: Resource[F, Reporter[F]],
metrics: MetricRefs[F],
period: FiniteDuration,
remoteAdaptersEnabled: Boolean
remoteAdaptersEnabled: Boolean,
incompleteEventsEnabled: Boolean
): Stream[F, Unit] =
for {
rep <- Stream.resource(reporter)
_ <- Stream.fixedDelay[F](period)
snapshot <- Stream.eval(MetricRefs.snapshot(metrics, remoteAdaptersEnabled))
snapshot <- Stream.eval(MetricRefs.snapshot(metrics, remoteAdaptersEnabled, incompleteEventsEnabled))
_ <- Stream.eval(rep.report(snapshot))
} yield ()

Expand All @@ -234,6 +255,9 @@ object Metrics {
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, RawCounterName)} = ${snapshot.rawCount}")
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, GoodCounterName)} = ${snapshot.goodCount}")
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, BadCounterName)} = ${snapshot.badCount}")
_ <- snapshot.incompleteCount
.map(cnt => logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, IncompleteCounterName)} = $cnt"))
.getOrElse(Applicative[F].unit)
_ <- logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, InvalidCounterName)} = ${snapshot.invalidCount}")
_ <- snapshot.enrichLatency
.map(latency => logger.info(s"${MetricsReporters.normalizeMetric(config.prefix, LatencyGaugeName)} = $latency"))
Expand All @@ -257,6 +281,7 @@ object Metrics {
def rawCount(nb: Int): F[Unit] = Applicative[F].unit
def goodCount(nb: Int): F[Unit] = Applicative[F].unit
def badCount(nb: Int): F[Unit] = Applicative[F].unit
def incompleteCount(nb: Int): F[Unit] = Applicative[F].unit
def invalidCount: F[Unit] = Applicative[F].unit
def remoteAdaptersSuccessCount: F[Unit] = Applicative[F].unit
def remoteAdaptersFailureCount: F[Unit] = Applicative[F].unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ object StatsDReporter {
Metrics.BadCounterName -> snapshot.badCount.toString,
Metrics.InvalidCounterName -> snapshot.invalidCount.toString
) ++ snapshot.enrichLatency.map(l => Metrics.LatencyGaugeName -> l.toString) ++
snapshot.incompleteCount.map(cnt => Metrics.IncompleteCounterName -> cnt.toString) ++
snapshot.remoteAdaptersSuccessCount.map(cnt => Metrics.RemoteAdaptersSuccessCounterName -> cnt.toString) ++
snapshot.remoteAdaptersFailureCount.map(cnt => Metrics.RemoteAdaptersFailureCounterName -> cnt.toString) ++
snapshot.remoteAdaptersTimeoutCount.map(cnt => Metrics.RemoteAdaptersTimeoutCounterName -> cnt.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer}

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, FailureDetails, Processor, Payload => BadRowPayload}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.IpLookupsEnrichment
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{IpLookupsEnrichment, JavascriptScriptEnrichment}
import com.snowplowanalytics.snowplow.enrich.common.enrichments.{AtomicFields, MiscEnrichments}
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
Expand Down Expand Up @@ -187,21 +187,55 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck {
}

"enrich" should {
"update metrics with raw, good and bad counters" in {
val input = Stream.emits(List(Array.empty[Byte], EnrichSpec.payload))
TestEnvironment.make(input).use { test =>
"update metrics with raw, good, bad and incomplete counters" in {
val script = """
function process(event, params) {
if(event.getUser_ipaddress() == "foo") {
throw "BOOM";
}
return [ ];
}"""
val config = json"""{
"parameters": {
"script": ${ConversionUtils.encodeBase64Url(script)}
}
}"""
val schemaKey = SchemaKey(
"com.snowplowanalytics.snowplow",
"javascript_script_config",
"jsonschema",
SchemaVer.Full(1, 0, 0)
)
val jsEnrichConf =
JavascriptScriptEnrichment.parse(config, schemaKey).toOption.get

val context = EnrichSpec.context.copy(ipAddress = Some("foo"))
val payload = EnrichSpec.collectorPayload.copy(context = context)

val input = Stream.emits(
List(
Array.empty[Byte],
EnrichSpec.payload,
payload.toRaw
)
)

TestEnvironment.make(input, List(jsEnrichConf)).use { test =>
val enrichStream = Enrich.run[IO, Array[Byte]](test.env)
for {
_ <- enrichStream.compile.drain
bad <- test.bad
good <- test.good
incomplete <- test.incomplete
counter <- test.counter.get
} yield {
(counter.raw must_== 2L)
(counter.raw must_== 3L)
(counter.good must_== 1L)
(counter.bad must_== 1L)
(bad.size must_== 1)
(counter.bad must_== 2L)
(counter.incomplete must_== 1L)
(bad.size must_== 2)
(good.size must_== 1)
(incomplete.size must_== 1)
}
}
}
Expand Down Expand Up @@ -249,9 +283,10 @@ class EnrichSpec extends Specification with CatsEffect with ScalaCheck {
test
.run(_.copy(assetsUpdatePeriod = Some(1800.millis)))
.map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
(good must contain(exactly(one, two)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {
val testWithHttp = HttpServer.resource *> TestEnvironment.make(input, List(enrichment))
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
bad must beEmpty
pii must beEmpty
incomplete must beEmpty
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
Expand Down Expand Up @@ -117,9 +118,10 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
good must beEmpty
pii must beEmpty
incomplete must haveSize(nbEvents)
bad.collect { case ef: BadRow.EnrichmentFailures => ef } must haveSize(nbEvents)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ class IabEnrichmentSpec extends Specification with CatsEffect {
val testWithHttp = HttpServer.resource *> TestEnvironment.make(input, List(IabEnrichmentSpec.enrichmentConf))
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
Expand Down Expand Up @@ -95,9 +96,10 @@ class IabEnrichmentSpec extends Specification with CatsEffect {
val testWithHttp = HttpServer.resource *> TestEnvironment.make(input, List(IabEnrichmentSpec.enrichmentConf))
testWithHttp.use { test =>
test.run(_.copy(assetsUpdatePeriod = Some(1800.millis))).map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
good.map(_.derived_contexts) must contain(exactly(expectedOne, expectedTwo))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ class YauaaEnrichmentSpec extends Specification with CatsEffect {

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
case (bad, pii, good, incomplete) =>
(bad must be empty)
(pii must be empty)
(incomplete must be empty)
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
Expand Down
Loading

0 comments on commit ebe793f

Please sign in to comment.