Skip to content

Commit

Permalink
Switch from Blaze client to Ember client (close #853)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Jan 22, 2024
1 parent 618ded9 commit 31f3251
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2
import java.util.concurrent.TimeoutException

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

import cats.Show
import cats.data.EitherT
Expand Down Expand Up @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}

Expand Down Expand Up @@ -139,35 +140,41 @@ object Environment {
final case class Enrichments[F[_]: Async](
registry: EnrichmentRegistry[F],
configs: List[EnrichmentConf],
httpClient: HttpClient[F]
httpApiEnrichment: HttpClient[F]
) {

/** Initialize same enrichments, specified by configs (in case DB files updated) */
def reinitialize(blockingEC: ExecutionContext, shifter: ShiftExecution[F]): F[Enrichments[F]] =
Enrichments.buildRegistry(configs, blockingEC, shifter, httpClient).map(registry => Enrichments(registry, configs, httpClient))
Enrichments
.buildRegistry(configs, blockingEC, shifter, httpApiEnrichment)
.map(registry => Enrichments(registry, configs, httpApiEnrichment))
}

object Enrichments {
def make[F[_]: Async](
configs: List[EnrichmentConf],
blockingEC: ExecutionContext,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
shifter: ShiftExecution[F]
): Resource[F, Ref[F, Enrichments[F]]] =
Resource.eval {
for {
registry <- buildRegistry[F](configs, blockingEC, shifter, httpClient)
ref <- Ref.of(Enrichments[F](registry, configs, httpClient))
} yield ref
}
for {
// We don't want the HTTP client of API enrichment to be reinitialized each time the assets are refreshed
httpClient <- configs.collectFirst { case api: ApiRequestConf => api.api.timeout } match {
case Some(timeout) =>
Clients.mkHttp(readTimeout = timeout.millis).map(HttpClient.fromHttp4sClient[F])
case None =>
Resource.pure[F, HttpClient[F]](HttpClient.noop[F])
}
registry <- Resource.eval(buildRegistry[F](configs, blockingEC, shifter, httpClient))
ref <- Resource.eval(Ref.of(Enrichments[F](registry, configs, httpClient)))
} yield ref

def buildRegistry[F[_]: Async](
configs: List[EnrichmentConf],
blockingEC: ExecutionContext,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
httpApiEnrichment: HttpClient[F]
) =
EnrichmentRegistry.build[F](configs, shifter, httpClient, blockingEC).value.flatMap {
EnrichmentRegistry.build[F](configs, shifter, httpApiEnrichment, blockingEC).value.flatMap {
case Right(reg) => Async[F].pure(reg)
case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error))
}
Expand Down Expand Up @@ -198,7 +205,6 @@ object Environment {
bad <- sinkBad
pii <- sinkPii.sequence
http4s <- Clients.mkHttp()
http = HttpClient.fromHttp4sClient(http4s)
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
Expand All @@ -210,7 +216,7 @@ object Environment {
sem <- Resource.eval(Semaphore(1L))
assetsState <- Resource.eval(Assets.State.make[F](sem, clts, assets))
shifter <- ShiftExecution.ofSingleThread[F]
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter, http)
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter)
} yield Environment[F, A](
igluClient,
Http4sRegistryLookup(http4s),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import fs2.Stream
import org.http4s.{Headers, Request, Uri}
import org.http4s.client.defaults
import org.http4s.client.{Client => Http4sClient}
import org.http4s.client.middleware.{Retry, RetryPolicy}
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.pipeline.Command
import org.http4s.client.middleware.Retry
import org.http4s.ember.client.EmberClientBuilder

import Clients._

Expand Down Expand Up @@ -67,29 +66,16 @@ object Clients {
def mkHttp[F[_]: Async](
connectionTimeout: FiniteDuration = defaults.ConnectTimeout,
readTimeout: FiniteDuration = defaults.RequestTimeout,
maxConnections: Int = 10 // http4s uses 10 by default
): Resource[F, Http4sClient[F]] =
BlazeClientBuilder[F]
.withConnectTimeout(connectionTimeout)
.withRequestTimeout(readTimeout)
.withMaxTotalConnections(maxConnections)
.resource
.map(Retry[F](retryPolicy, redactHeadersWhen))

private def retryPolicy[F[_]] =
RetryPolicy[F](
backoff,
retriable = {
//EOF error has to be retried explicitly for blaze client, see https://github.com/snowplow/enrich/issues/692
case (_, Left(Command.EOF)) => true
case _ => false
}
)

//retry once after 100 mills
private def backoff(attemptNumber: Int): Option[FiniteDuration] =
if (attemptNumber > 1) None
else Some(100.millis)
maxConnections: Int = 100 // default of Ember client
): Resource[F, Http4sClient[F]] = {
val builder = EmberClientBuilder
.default[F]
.withTimeout(readTimeout)
.withIdleConnectionTime(connectionTimeout)
.withMaxTotal(maxConnections)
val retryPolicy = builder.retryPolicy
builder.build.map(Retry[F](retryPolicy, redactHeadersWhen))
}

private def redactHeadersWhen(header: CIString) =
(Headers.SensitiveHeaders + CIString("apikey")).contains(header)
Expand Down
2 changes: 2 additions & 0 deletions modules/common-fs2/src/test/resources/simplelogger.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.Enri
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.Assets=warn
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment=info
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.test.HttpServer=info

org.slf4j.simpleLogger.log.org.http4s.client.middleware.Retry=error
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import cats.effect.unsafe.implicits.global

import cats.effect.testing.specs2.CatsEffect

import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

Expand Down Expand Up @@ -116,9 +116,7 @@ class AssetsSpec extends Specification with CatsEffect with ScalaCheck {
for {
shiftExecution <- ShiftExecution.ofSingleThread[IO]
sem <- Resource.eval(Semaphore[IO](1L))
http4s <- Clients.mkHttp[IO]()
http = HttpClient.fromHttp4sClient(http4s)
enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution, http)
enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution)
_ <- SpecHelpers.filesResource(TestFiles)
} yield (shiftExecution, sem, enrichments)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
import com.snowplowanalytics.snowplow.enrich.common.fs2.enrichments.ApiRequestEnrichmentSpec.unstructEvent
import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec
import com.snowplowanalytics.snowplow.enrich.common.fs2.test._
import com.snowplowanalytics.snowplow.badrows.BadRow

class ApiRequestEnrichmentSpec extends Specification with CatsEffect {

Expand Down Expand Up @@ -84,9 +85,44 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
(bad must be empty)
(pii must be empty)
(good.map(_.derived_contexts) must contain(exactly(expected)))
bad must beEmpty
pii must beEmpty
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
}

"generate bad rows if API server is not available" in {
val nbEvents = 1000
val input = Stream((1 to nbEvents).toList: _*)
.map { i =>
json"""{
"schema": "iglu:com.acme/test/jsonschema/1-0-1",
"data": {"path": {"id": $i}}
}"""
}
.map { ue =>
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ue_px", unstructEvent(ue)) :: EnrichSpec.collectorPayload.querystring
)
}
.map(_.toRaw)

val enrichment = ApiRequestConf(
SchemaKey("com.acme", "enrichment", "jsonschema", SchemaVer.Full(1, 0, 0)),
List(Input.Json("key1", "unstruct_event", SchemaCriterion("com.acme", "test", "jsonschema", 1), "$.path.id")),
HttpApi("GET", "http://foo.bar.unassigned/{{key1}}", 2000, Authentication(None)),
List(RegistryOutput("iglu:com.acme/output/jsonschema/1-0-0", Some(JsonOutput("$")))),
Cache(1, 1000),
ignoreOnError = false
)

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
good must beEmpty
pii must beEmpty
bad.collect { case ef: BadRow.EnrichmentFailures => ef } must haveSize(nbEvents)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLook

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.iglu.core.SelfDescribingData

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution

import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment}
import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings}
Expand Down Expand Up @@ -112,7 +114,6 @@ object TestEnvironment extends CatsEffect {
def make(source: Stream[IO, Array[Byte]], enrichments: List[EnrichmentConf] = Nil): Resource[IO, TestEnvironment[Array[Byte]]] =
for {
http4s <- Clients.mkHttp[IO]()
http = HttpClient.fromHttp4sClient(http4s)
_ <- SpecHelpers.filesResource(enrichments.flatMap(_.filesToCache).map(p => Path(p._2)))
counter <- Resource.eval(Counter.make[IO])
metrics = Counter.mkCounterMetrics[IO](counter)
Expand All @@ -122,7 +123,7 @@ object TestEnvironment extends CatsEffect {
sem <- Resource.eval(Semaphore[IO](1L))
assetsState <- Resource.eval(Assets.State.make(sem, clients, enrichments.flatMap(_.filesToCache)))
shifter <- ShiftExecution.ofSingleThread[IO]
enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter, http)
enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter)
goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty))
Expand Down Expand Up @@ -168,7 +169,11 @@ object TestEnvironment extends CatsEffect {
.parse(badRowStr)
.getOrElse(throw new RuntimeException(s"Error parsing bad row json: $badRowStr"))
parsed
.as[BadRow]
.getOrElse(throw new RuntimeException(s"Error decoding bad row: $parsed"))
.as[SelfDescribingData[BadRow]] match {
case Left(e) =>
throw new RuntimeException(s"Error decoding bad row $parsed", e)
case Right(sdj) =>
sdj.data
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ object EnrichmentRegistry {
def build[F[_]: Async](
confs: List[EnrichmentConf],
shifter: ShiftExecution[F],
httpClient: HttpClient[F],
httpApiEnrichment: HttpClient[F],
blockingEC: ExecutionContext
): EitherT[F, String, EnrichmentRegistry[F]] =
confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) =>
e match {
case c: ApiRequestConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](httpClient))
enrichment <- EitherT.right(c.enrichment[F](httpApiEnrichment))
registry <- er
} yield registry.copy(apiRequest = enrichment.some)
case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ package com.snowplowanalytics.snowplow.enrich.common.utils

import cats.effect.kernel.Sync
import cats.implicits._
import fs2.Stream
import org.typelevel.ci.CIString
import org.http4s.client.{Client => Http4sClient}
import org.http4s.headers.Authorization
import org.http4s.{BasicCredentials, EmptyBody, EntityBody, Header, Headers, Method, Request, Status, Uri}
import org.http4s.headers.{Authorization, `Content-Type`}
import org.http4s.{BasicCredentials, Header, Headers, MediaType, Method, Request, Status, Uri}

trait HttpClient[F[_]] {
def getResponse(
Expand Down Expand Up @@ -58,14 +57,17 @@ object HttpClient {
case Left(parseFailure) =>
Sync[F].pure(new IllegalArgumentException(s"uri [$uri] is not valid: ${parseFailure.sanitized}").asLeft[String])
case Right(validUri) =>
val request = Request[F](
val requestWithoutBody = Request[F](
uri = validUri,
method = Method.fromString(method).getOrElse(Method.GET),
body = body.fold[EntityBody[F]](EmptyBody)(s => Stream.emits(s.getBytes)),
headers = getHeaders(authUser, authPassword)
)
val requestWithBody = body.fold(requestWithoutBody)(s =>
requestWithoutBody.withEntity(s).withContentType(`Content-Type`(MediaType.application.json))
)

http4sClient
.run(request)
.run(requestWithBody)
.use[Either[Throwable, String]] { response =>
val body = response.bodyText.compile.string
response.status.responseClass match {
Expand All @@ -76,4 +78,16 @@ object HttpClient {
.handleError(_.asLeft[String])
}
}

def noop[F[_]: Sync]: HttpClient[F] =
new HttpClient[F] {
override def getResponse(
uri: String,
authUser: Option[String],
authPassword: Option[String],
body: Option[String],
method: String
): F[Either[Throwable, String]] =
Sync[F].raiseError(new IllegalStateException("HTTP client not implemented"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global

import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.ember.client.EmberClientBuilder

import cats.effect.testing.specs2.CatsEffect

Expand Down Expand Up @@ -91,8 +91,8 @@ object SpecHelpers extends CatsEffect {
val registryLookup = JavaNetRegistryLookup.ioLookupInstance[IO]

val blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool)
private val http4sClient = BlazeClientBuilder[IO].resource
val httpClient = http4sClient.map(HttpClient.fromHttp4sClient[IO])

val httpClient = EmberClientBuilder.default[IO].build.map(HttpClient.fromHttp4sClient[IO])

private type NvPair = (String, String)

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ object Dependencies {
.exclude("software.amazon.glue", "schema-registry-serde")
val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2 % Runtime
val azureIdentity = "com.azure" % "azure-identity" % V.azureIdentity
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val http4sClient = "org.http4s" %% "http4s-ember-client" % V.http4s
val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s
val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry
Expand Down

0 comments on commit 31f3251

Please sign in to comment.