From a42d538029335cce627e1c3039c81b88e4e06c96 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Mon, 8 Apr 2024 15:10:30 +0200 Subject: [PATCH] Address review feedback --- .github/workflows/test.yml | 2 +- build.sbt | 3 +- .../snowplow/enrich/kinesis/Containers.scala | 156 +++++++++--------- .../snowplow/enrich/kinesis/DockerPull.scala | 35 ++++ .../enrich/kinesis/EnrichKinesisSpec.scala | 60 ++++--- project/Dependencies.scala | 2 + 6 files changed, 155 insertions(+), 103 deletions(-) create mode 100644 modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/DockerPull.scala diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 43e503e63..42fd5ed22 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,7 +43,7 @@ jobs: env: PGPASSWORD: supersecret1 - name: Run tests - run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test + run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" TESTCONTAINERS_RYUK_DISABLED=true sbt coverage +test env: OER_KEY: ${{ secrets.OER_KEY }} - name: Check Scala formatting diff --git a/build.sbt b/build.sbt index ac5004a2a..324fc296c 100644 --- a/build.sbt +++ b/build.sbt @@ -100,7 +100,8 @@ lazy val kinesisDistroless = project .settings(libraryDependencies ++= kinesisDependencies ++ Seq( // integration tests dependencies specs2CEIt, - testContainersIt + testContainersIt, + dockerJavaIt )) .settings(excludeDependencies ++= exclusions) .settings(addCompilerPlugin(betterMonadicFor)) diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala index 5511487d9..adfd10428 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/Containers.scala @@ -19,11 +19,13 @@ import org.slf4j.LoggerFactory import retry.syntax.all._ import retry.RetryPolicies +import cats.implicits._ + import cats.effect.{IO, Resource} import cats.effect.testing.specs2.CatsEffect -import org.testcontainers.containers.{BindMode, GenericContainer => JGenericContainer, Network} +import org.testcontainers.containers.{BindMode, Network} import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.containers.output.Slf4jLogConsumer @@ -34,47 +36,66 @@ import com.snowplowanalytics.snowplow.enrich.kinesis.generated.BuildInfo object Containers extends CatsEffect { - private val network = Network.newNetwork() + object Images { + case class DockerImage(image: String, tag: String) { + def toStr = s"$image:$tag" + } + val Localstack = DockerImage("localstack/localstack-light", "1.2.0") + val Enrich = DockerImage("snowplow/snowplow-enrich-kinesis", s"${BuildInfo.version}-distroless") + val MySQL = DockerImage("mysql", "8.0.31") + val HTTP = DockerImage("nginx", "1.23.2") + val Statsd = DockerImage("dblworks/statsd", "v0.10.2") // the official statsd/statsd size is monstrous + } + + case class Localstack( + container: GenericContainer, + alias: String, + internalPort: Int, + mappedPort: Int + ) - private val localstackPort = 4566 - private val localstackAlias = "localstack" + private val network = Network.newNetwork() - val localstack = { + def localstack: Resource[IO, Localstack] = Resource.make { + val port = 4566 val container = GenericContainer( - dockerImage = "localstack/localstack-light:1.2.0", - fileSystemBind = Seq( - GenericContainer.FileSystemBind( - "modules/kinesis/src/it/resources/localstack", - "/docker-entrypoint-initaws.d", - BindMode.READ_ONLY - ) - ), + dockerImage = Images.Localstack.toStr, env = Map( "AWS_ACCESS_KEY_ID" -> "foo", "AWS_SECRET_ACCESS_KEY" -> "bar" ), waitStrategy = Wait.forLogMessage(".*Ready.*", 1), - exposedPorts = Seq(localstackPort) + exposedPorts = Seq(port) ) container.underlyingUnsafeContainer.withNetwork(network) - container.underlyingUnsafeContainer.withNetworkAliases(localstackAlias) - container.container + val alias = "localstack" + container.underlyingUnsafeContainer.withNetworkAliases(alias) + + IO.blocking(container.start()) *> + IO( + Localstack( + container, + alias, + port, + container.container.getMappedPort(port) + ) + ) + } { + l => IO.blocking(l.container.stop()) } - def localstackMappedPort = localstack.getMappedPort(localstackPort) - def enrich( + localstack: Localstack, configPath: String, testName: String, - needsLocalstack: Boolean, enrichments: List[Enrichment], uuid: String = UUID.randomUUID().toString, waitLogMessage: String = "Running Enrich" - ): Resource[IO, JGenericContainer[_]] = { + ): Resource[IO, GenericContainer] = { val streams = KinesisConfig.getStreams(uuid) val container = GenericContainer( - dockerImage = s"snowplow/snowplow-enrich-kinesis:${BuildInfo.version}-distroless", + dockerImage = Images.Enrich.toStr, env = Map( "AWS_REGION" -> KinesisConfig.region, "AWS_ACCESS_KEY_ID" -> "foo", @@ -87,7 +108,7 @@ object Containers extends CatsEffect { "STREAM_ENRICHED" -> streams.enriched, "STREAM_BAD" -> streams.bad, "STREAM_INCOMPLETE" -> streams.incomplete, - "LOCALSTACK_ENDPOINT" -> s"http://$localstackAlias:$localstackPort" + "LOCALSTACK_ENDPOINT" -> s"http://${localstack.alias}:${localstack.internalPort}" ), fileSystemBind = Seq( GenericContainer.FileSystemBind( @@ -113,16 +134,16 @@ object Containers extends CatsEffect { ) container.container.withNetwork(network) Resource.make ( - IO(startLocalstack(needsLocalstack, KinesisConfig.region, streams)) >> - IO(startContainerWithLogs(container.container, testName)) + createStreams(localstack, KinesisConfig.region, streams) *> + startContainerWithLogs(container, testName) )( - e => IO(e.stop()) + e => IO.blocking(e.stop()) ) } - def mysqlServer: Resource[IO, JGenericContainer[_]] = Resource.make { + def mysqlServer: Resource[IO, GenericContainer] = Resource.make { val container = GenericContainer( - dockerImage = "mysql:8.0.31", + dockerImage = Images.MySQL.toStr, fileSystemBind = Seq( GenericContainer.FileSystemBind( "modules/kinesis/src/it/resources/mysql", @@ -140,14 +161,14 @@ object Containers extends CatsEffect { ) container.underlyingUnsafeContainer.withNetwork(network) container.underlyingUnsafeContainer.withNetworkAliases("mysql") - IO(container.start()) >> IO.pure(container.container) + IO(container.start()) *> IO.pure(container) } { c => IO(c.stop()) } - def httpServer: Resource[IO, JGenericContainer[_]] = Resource.make { + def httpServer: Resource[IO, GenericContainer] = Resource.make { val container = GenericContainer( - dockerImage = "nginx:1.23.2", + dockerImage = Images.HTTP.toStr, fileSystemBind = Seq( GenericContainer.FileSystemBind( "modules/kinesis/src/it/resources/nginx/default.conf", @@ -169,33 +190,32 @@ object Containers extends CatsEffect { ) container.underlyingUnsafeContainer.withNetwork(network) container.underlyingUnsafeContainer.withNetworkAliases("api") - IO(container.start()) >> IO.pure(container.container) + IO.blocking(container.start()) *> IO.pure(container) } { - c => IO(c.stop()) + c => IO.blocking(c.stop()) } - def statsdServer: Resource[IO, JGenericContainer[_]] = Resource.make { - val container = GenericContainer("dblworks/statsd:v0.10.2") // the official statsd/statsd size is monstrous + def statsdServer: Resource[IO, GenericContainer] = Resource.make { + val container = GenericContainer(Images.Statsd.toStr) container.underlyingUnsafeContainer.withNetwork(network) container.underlyingUnsafeContainer.withNetworkAliases("statsd") container.underlyingUnsafeContainer.addExposedPort(8126) - IO(container.start()) >> IO.pure(container.container) + IO.blocking(container.start()) *> IO.pure(container) } { - c => IO(c.stop()) + c => IO.blocking(c.stop()) } private def startContainerWithLogs( - container: JGenericContainer[_], + container: GenericContainer, loggerName: String - ): JGenericContainer[_] = { + ): IO[GenericContainer] = { val logger = LoggerFactory.getLogger(loggerName) val logs = new Slf4jLogConsumer(logger) - container.start() - container.followOutput(logs) - container + IO.blocking(container.start()) *> + IO(container.container.followOutput(logs)).as(container) } - def waitUntilStopped(container: JGenericContainer[_]): IO[Boolean] = { + def waitUntilStopped(container: GenericContainer): IO[Boolean] = { val retryPolicy = RetryPolicies.limitRetriesByCumulativeDelay( 5.minutes, RetryPolicies.capDelay[IO]( @@ -204,50 +224,32 @@ object Containers extends CatsEffect { ) ) - IO(container.isRunning()).retryingOnFailures( + IO(container.container.isRunning()).retryingOnFailures( _ => IO.pure(false), retryPolicy, (_, _) => IO.unit ) } - // synchronized so that start() isn't called by several threads at the same time. - // start() is blocking. - // Calling start() on an already started container has no effect. - private def startLocalstack( - needsLocalstack: Boolean, - region: String, - streams: KinesisConfig.Streams - ): Unit = synchronized { - if(needsLocalstack) { - localstack.start() - createStreams( - localstack, - localstackPort, - region, - streams - ) - } else () - } - private def createStreams( - localstack: JGenericContainer[_], - port: Int, + localstack: Localstack, region: String, streams: KinesisConfig.Streams - ): Unit = - List(streams.raw, streams.enriched, streams.bad, streams.incomplete).foreach { stream => - localstack.execInContainer( - "aws", - s"--endpoint-url=http://127.0.0.1:$port", - "kinesis", - "create-stream", - "--stream-name", - stream, - "--shard-count", - "1", - "--region", - region + ): IO[Unit] = + List(streams.raw, streams.enriched, streams.bad, streams.incomplete).traverse_ { stream => + IO.blocking( + localstack.container.execInContainer( + "aws", + s"--endpoint-url=http://127.0.0.1:${localstack.internalPort}", + "kinesis", + "create-stream", + "--stream-name", + stream, + "--shard-count", + "1", + "--region", + region + ) ) } } diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/DockerPull.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/DockerPull.scala new file mode 100644 index 000000000..d38b20428 --- /dev/null +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/DockerPull.scala @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.enrich.kinesis + +import com.github.dockerjava.core.DockerClientBuilder +import com.github.dockerjava.api.command.PullImageResultCallback +import com.github.dockerjava.api.model.PullResponseItem + +object DockerPull { + + /** + * A blocking operation that runs on main thread to pull container image before `CatsResource` is + * created. This operation is then not counted towards test timeout. + */ + def pull(image: String, tag: String): Unit = + DockerClientBuilder + .getInstance() + .build() + .pullImageCmd(image) + .withTag(tag) + .withPlatform("linux/amd64") + .exec(new PullImageResultCallback() { + override def onNext(item: PullResponseItem) = { + println(s"$image: ${item.getStatus()}") + super.onNext(item) + } + }) + .awaitCompletion() + .onComplete() +} diff --git a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala index adcff0e3b..eb1ce325e 100644 --- a/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala +++ b/modules/kinesis/src/it/scala/com/snowplowanalytics/snowplow/enrich/kinesis/EnrichKinesisSpec.scala @@ -15,34 +15,46 @@ import java.util.UUID import scala.concurrent.duration._ import cats.effect.IO +import cats.effect.kernel.Resource -import cats.effect.testing.specs2.CatsEffect +import cats.effect.testing.specs2.CatsResource -import org.specs2.mutable.Specification -import org.specs2.specification.AfterAll +import org.specs2.mutable.SpecificationLike +import org.specs2.specification.BeforeAll import com.snowplowanalytics.snowplow.enrich.kinesis.enrichments._ + import com.snowplowanalytics.snowplow.enrich.common.fs2.test.CollectorPayloadGen -class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { +import com.snowplowanalytics.snowplow.enrich.kinesis.Containers.Localstack + +class EnrichKinesisSpec extends CatsResource[IO, Localstack] with SpecificationLike with BeforeAll { override protected val Timeout = 10.minutes - def afterAll: Unit = Containers.localstack.stop() + override def beforeAll(): Unit = { + DockerPull.pull(Containers.Images.Localstack.image, Containers.Images.Localstack.tag) + DockerPull.pull(Containers.Images.MySQL.image, Containers.Images.MySQL.tag) + DockerPull.pull(Containers.Images.HTTP.image, Containers.Images.HTTP.tag) + DockerPull.pull(Containers.Images.Statsd.image, Containers.Images.Statsd.tag) + super.beforeAll() + } + + override val resource: Resource[IO, Localstack] = Containers.localstack "enrich-kinesis" should { - "be able to parse the minimal config" in { + "be able to parse the minimal config" in withResource { localstack => Containers.enrich( + localstack, configPath = "config/config.kinesis.minimal.hocon", testName = "minimal", - needsLocalstack = false, enrichments = Nil ).use { e => - IO(e.getLogs must contain("Running Enrich")) + IO(e.container.getLogs must contain("Running Enrich")) } } - "emit the correct number of enriched events, bad rows and incomplete events" in { + "emit the correct number of enriched events, bad rows and incomplete events" in withResource { localstack => import utils._ val testName = "count" @@ -52,13 +64,13 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { val resources = for { _ <- Containers.enrich( + localstack, configPath = "modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon", testName = testName, - needsLocalstack = true, enrichments = Nil, uuid = uuid ) - enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid) + enrichPipe <- mkEnrichPipe(localstack.mappedPort, uuid) } yield enrichPipe val input = CollectorPayloadGen.generate[IO](nbGood, nbBad) @@ -75,7 +87,7 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { } } - "send the metrics to StatsD" in { + "send the metrics to StatsD" in withResource { localstack => import utils._ val testName = "statsd" @@ -85,17 +97,17 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { val resources = for { statsd <- Containers.statsdServer - statsdHost = statsd.getHost() - statsdAdminPort = statsd.getMappedPort(8126) + statsdHost = statsd.container.getHost() + statsdAdminPort = statsd.container.getMappedPort(8126) statsdAdmin <- mkStatsdAdmin(statsdHost, statsdAdminPort) _ <- Containers.enrich( + localstack, configPath = "modules/kinesis/src/it/resources/enrich/enrich-localstack-statsd.hocon", testName = testName, - needsLocalstack = true, enrichments = Nil, uuid = uuid ) - enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid) + enrichPipe <- mkEnrichPipe(localstack.mappedPort, uuid) } yield (enrichPipe, statsdAdmin) val input = CollectorPayloadGen.generate[IO](nbGood, nbBad) @@ -120,7 +132,7 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { } } - "run the enrichments and attach their context" in { + "run the enrichments and attach their context" in withResource { localstack => import utils._ val testName = "enrichments" @@ -140,13 +152,13 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { _ <- Containers.mysqlServer _ <- Containers.httpServer _ <- Containers.enrich( + localstack, configPath = "modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon", testName = testName, - needsLocalstack = true, enrichments = enrichments, uuid = uuid ) - enrichPipe <- mkEnrichPipe(Containers.localstackMappedPort, uuid) + enrichPipe <- mkEnrichPipe(localstack.mappedPort, uuid) } yield enrichPipe val input = CollectorPayloadGen.generate[IO](nbGood) @@ -166,21 +178,21 @@ class EnrichKinesisSpec extends Specification with AfterAll with CatsEffect { } } - "shutdown when it receives a SIGTERM" in { + "shutdown when it receives a SIGTERM" in withResource { localstack => Containers.enrich( + localstack, configPath = "modules/kinesis/src/it/resources/enrich/enrich-localstack.hocon", testName = "stop", - needsLocalstack = true, enrichments = Nil, waitLogMessage = "enrich.metrics" ).use { enrich => for { _ <- IO(println("stop - Sending signal")) - _ <- IO(enrich.getDockerClient().killContainerCmd(enrich.getContainerId()).withSignal("TERM").exec()) + _ <- IO(enrich.container.getDockerClient().killContainerCmd(enrich.container.getContainerId()).withSignal("TERM").exec()) _ <- Containers.waitUntilStopped(enrich) } yield { - enrich.isRunning() must beFalse - enrich.getLogs() must contain("Enrich stopped") + enrich.container.isRunning() must beFalse + enrich.container.getLogs() must contain("Enrich stopped") } } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 75eaccdf2..e143fb7e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -99,6 +99,7 @@ object Dependencies { val specs2CE = "1.5.0" val scalacheck = "1.17.0" val testcontainers = "0.40.10" + val dockerJava = "3.3.6" val parserCombinators = "2.1.1" val sentry = "1.7.30" @@ -163,6 +164,7 @@ object Dependencies { val eventGen = "com.snowplowanalytics" %% "snowplow-event-generator-core" % V.eventGen % Test val parserCombinators = "org.scala-lang.modules" %% "scala-parser-combinators" % V.parserCombinators % Test val testContainersIt = "com.dimafeng" %% "testcontainers-scala-core" % V.testcontainers % IntegrationTest + val dockerJavaIt = "com.github.docker-java" % "docker-java" % V.dockerJava % IntegrationTest val kinesisSdk = "com.amazonaws" % "aws-java-sdk-kinesis" % V.awsSdk val dynamodbSdk = "com.amazonaws" % "aws-java-sdk-dynamodb" % V.awsSdk