Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Apr 9, 2024
1 parent 5e02a5e commit a42d538
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 103 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
env:
PGPASSWORD: supersecret1
- name: Run tests
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" TESTCONTAINERS_RYUK_DISABLED=true sbt coverage +test
env:
OER_KEY: ${{ secrets.OER_KEY }}
- name: Check Scala formatting
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ lazy val kinesisDistroless = project
.settings(libraryDependencies ++= kinesisDependencies ++ Seq(
// integration tests dependencies
specs2CEIt,
testContainersIt
testContainersIt,
dockerJavaIt
))
.settings(excludeDependencies ++= exclusions)
.settings(addCompilerPlugin(betterMonadicFor))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import org.slf4j.LoggerFactory
import retry.syntax.all._
import retry.RetryPolicies

import cats.implicits._

import cats.effect.{IO, Resource}

import cats.effect.testing.specs2.CatsEffect

import org.testcontainers.containers.{BindMode, GenericContainer => JGenericContainer, Network}
import org.testcontainers.containers.{BindMode, Network}
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.output.Slf4jLogConsumer

Expand All @@ -34,47 +36,66 @@ import com.snowplowanalytics.snowplow.enrich.kinesis.generated.BuildInfo

object Containers extends CatsEffect {

private val network = Network.newNetwork()
object Images {
case class DockerImage(image: String, tag: String) {
def toStr = s"$image:$tag"
}
val Localstack = DockerImage("localstack/localstack-light", "1.2.0")
val Enrich = DockerImage("snowplow/snowplow-enrich-kinesis", s"${BuildInfo.version}-distroless")
val MySQL = DockerImage("mysql", "8.0.31")
val HTTP = DockerImage("nginx", "1.23.2")
val Statsd = DockerImage("dblworks/statsd", "v0.10.2") // the official statsd/statsd size is monstrous
}

case class Localstack(
container: GenericContainer,
alias: String,
internalPort: Int,
mappedPort: Int
)

private val localstackPort = 4566
private val localstackAlias = "localstack"
private val network = Network.newNetwork()

val localstack = {
def localstack: Resource[IO, Localstack] = Resource.make {
val port = 4566
val container = GenericContainer(
dockerImage = "localstack/localstack-light:1.2.0",
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
"modules/kinesis/src/it/resources/localstack",
"/docker-entrypoint-initaws.d",
BindMode.READ_ONLY
)
),
dockerImage = Images.Localstack.toStr,
env = Map(
"AWS_ACCESS_KEY_ID" -> "foo",
"AWS_SECRET_ACCESS_KEY" -> "bar"
),
waitStrategy = Wait.forLogMessage(".*Ready.*", 1),
exposedPorts = Seq(localstackPort)
exposedPorts = Seq(port)
)
container.underlyingUnsafeContainer.withNetwork(network)
container.underlyingUnsafeContainer.withNetworkAliases(localstackAlias)
container.container
val alias = "localstack"
container.underlyingUnsafeContainer.withNetworkAliases(alias)

IO.blocking(container.start()) *>
IO(
Localstack(
container,
alias,
port,
container.container.getMappedPort(port)
)
)
} {
l => IO.blocking(l.container.stop())
}

def localstackMappedPort = localstack.getMappedPort(localstackPort)

def enrich(
localstack: Localstack,
configPath: String,
testName: String,
needsLocalstack: Boolean,
enrichments: List[Enrichment],
uuid: String = UUID.randomUUID().toString,
waitLogMessage: String = "Running Enrich"
): Resource[IO, JGenericContainer[_]] = {
): Resource[IO, GenericContainer] = {
val streams = KinesisConfig.getStreams(uuid)

val container = GenericContainer(
dockerImage = s"snowplow/snowplow-enrich-kinesis:${BuildInfo.version}-distroless",
dockerImage = Images.Enrich.toStr,
env = Map(
"AWS_REGION" -> KinesisConfig.region,
"AWS_ACCESS_KEY_ID" -> "foo",
Expand All @@ -87,7 +108,7 @@ object Containers extends CatsEffect {
"STREAM_ENRICHED" -> streams.enriched,
"STREAM_BAD" -> streams.bad,
"STREAM_INCOMPLETE" -> streams.incomplete,
"LOCALSTACK_ENDPOINT" -> s"http://$localstackAlias:$localstackPort"
"LOCALSTACK_ENDPOINT" -> s"http://${localstack.alias}:${localstack.internalPort}"
),
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
Expand All @@ -113,16 +134,16 @@ object Containers extends CatsEffect {
)
container.container.withNetwork(network)
Resource.make (
IO(startLocalstack(needsLocalstack, KinesisConfig.region, streams)) >>
IO(startContainerWithLogs(container.container, testName))
createStreams(localstack, KinesisConfig.region, streams) *>
startContainerWithLogs(container, testName)
)(
e => IO(e.stop())
e => IO.blocking(e.stop())
)
}

def mysqlServer: Resource[IO, JGenericContainer[_]] = Resource.make {
def mysqlServer: Resource[IO, GenericContainer] = Resource.make {
val container = GenericContainer(
dockerImage = "mysql:8.0.31",
dockerImage = Images.MySQL.toStr,
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
"modules/kinesis/src/it/resources/mysql",
Expand All @@ -140,14 +161,14 @@ object Containers extends CatsEffect {
)
container.underlyingUnsafeContainer.withNetwork(network)
container.underlyingUnsafeContainer.withNetworkAliases("mysql")
IO(container.start()) >> IO.pure(container.container)
IO(container.start()) *> IO.pure(container)
} {
c => IO(c.stop())
}

def httpServer: Resource[IO, JGenericContainer[_]] = Resource.make {
def httpServer: Resource[IO, GenericContainer] = Resource.make {
val container = GenericContainer(
dockerImage = "nginx:1.23.2",
dockerImage = Images.HTTP.toStr,
fileSystemBind = Seq(
GenericContainer.FileSystemBind(
"modules/kinesis/src/it/resources/nginx/default.conf",
Expand All @@ -169,33 +190,32 @@ object Containers extends CatsEffect {
)
container.underlyingUnsafeContainer.withNetwork(network)
container.underlyingUnsafeContainer.withNetworkAliases("api")
IO(container.start()) >> IO.pure(container.container)
IO.blocking(container.start()) *> IO.pure(container)
} {
c => IO(c.stop())
c => IO.blocking(c.stop())
}

def statsdServer: Resource[IO, JGenericContainer[_]] = Resource.make {
val container = GenericContainer("dblworks/statsd:v0.10.2") // the official statsd/statsd size is monstrous
def statsdServer: Resource[IO, GenericContainer] = Resource.make {
val container = GenericContainer(Images.Statsd.toStr)
container.underlyingUnsafeContainer.withNetwork(network)
container.underlyingUnsafeContainer.withNetworkAliases("statsd")
container.underlyingUnsafeContainer.addExposedPort(8126)
IO(container.start()) >> IO.pure(container.container)
IO.blocking(container.start()) *> IO.pure(container)
} {
c => IO(c.stop())
c => IO.blocking(c.stop())
}

private def startContainerWithLogs(
container: JGenericContainer[_],
container: GenericContainer,
loggerName: String
): JGenericContainer[_] = {
): IO[GenericContainer] = {
val logger = LoggerFactory.getLogger(loggerName)
val logs = new Slf4jLogConsumer(logger)
container.start()
container.followOutput(logs)
container
IO.blocking(container.start()) *>
IO(container.container.followOutput(logs)).as(container)
}

def waitUntilStopped(container: JGenericContainer[_]): IO[Boolean] = {
def waitUntilStopped(container: GenericContainer): IO[Boolean] = {
val retryPolicy = RetryPolicies.limitRetriesByCumulativeDelay(
5.minutes,
RetryPolicies.capDelay[IO](
Expand All @@ -204,50 +224,32 @@ object Containers extends CatsEffect {
)
)

IO(container.isRunning()).retryingOnFailures(
IO(container.container.isRunning()).retryingOnFailures(
_ => IO.pure(false),
retryPolicy,
(_, _) => IO.unit
)
}

// synchronized so that start() isn't called by several threads at the same time.
// start() is blocking.
// Calling start() on an already started container has no effect.
private def startLocalstack(
needsLocalstack: Boolean,
region: String,
streams: KinesisConfig.Streams
): Unit = synchronized {
if(needsLocalstack) {
localstack.start()
createStreams(
localstack,
localstackPort,
region,
streams
)
} else ()
}

private def createStreams(
localstack: JGenericContainer[_],
port: Int,
localstack: Localstack,
region: String,
streams: KinesisConfig.Streams
): Unit =
List(streams.raw, streams.enriched, streams.bad, streams.incomplete).foreach { stream =>
localstack.execInContainer(
"aws",
s"--endpoint-url=http://127.0.0.1:$port",
"kinesis",
"create-stream",
"--stream-name",
stream,
"--shard-count",
"1",
"--region",
region
): IO[Unit] =
List(streams.raw, streams.enriched, streams.bad, streams.incomplete).traverse_ { stream =>
IO.blocking(
localstack.container.execInContainer(
"aws",
s"--endpoint-url=http://127.0.0.1:${localstack.internalPort}",
"kinesis",
"create-stream",
"--stream-name",
stream,
"--shard-count",
"1",
"--region",
region
)
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2024-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.enrich.kinesis

import com.github.dockerjava.core.DockerClientBuilder
import com.github.dockerjava.api.command.PullImageResultCallback
import com.github.dockerjava.api.model.PullResponseItem

object DockerPull {

/**
* A blocking operation that runs on main thread to pull container image before `CatsResource` is
* created. This operation is then not counted towards test timeout.
*/
def pull(image: String, tag: String): Unit =
DockerClientBuilder
.getInstance()
.build()
.pullImageCmd(image)
.withTag(tag)
.withPlatform("linux/amd64")
.exec(new PullImageResultCallback() {
override def onNext(item: PullResponseItem) = {
println(s"$image: ${item.getStatus()}")
super.onNext(item)
}
})
.awaitCompletion()
.onComplete()
}
Loading

0 comments on commit a42d538

Please sign in to comment.