From 618ded9a68946af734d613d937564d6e4eaf29b4 Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Thu, 18 Jan 2024 12:19:43 +0100 Subject: [PATCH 1/2] Bump snowplow-badrows (will be merged into upgrading to Cats Effect 3 commit) --- .../snowplow/enrich/common/fs2/test/Utils.scala | 6 ++---- project/Dependencies.scala | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Utils.scala b/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Utils.scala index 50fca87d2..996c985c2 100644 --- a/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Utils.scala +++ b/modules/common-fs2/src/it/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/Utils.scala @@ -18,14 +18,12 @@ import io.circe.parser import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.iglu.core.SelfDescribingData -import com.snowplowanalytics.iglu.core.circe.implicits._ object Utils { def parseBadRow(s: String): Either[String, BadRow] = for { json <- parser.parse(s).leftMap(_.message) - sdj <- SelfDescribingData.parse(json).leftMap(_.message("Can't decode JSON as SDJ")) - br <- sdj.data.as[BadRow].leftMap(_.getMessage()) - } yield br + sdj <- json.as[SelfDescribingData[BadRow]].leftMap(_.message) + } yield sdj.data } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 09c5b459d..ffb5f23fe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -62,7 +62,7 @@ object Dependencies { val scalaWeather = "2.0.0" val gatlingJsonpath = "0.6.14" val scalaUri = "1.5.1" - val badRows = "2.2.1" + val badRows = "2.3.0" val igluClient = "3.1.0" val snowplowRawEvent = "0.1.0" From 31f32519050f8ad2ce1e7767fc530ed30adb4ecb Mon Sep 17 00:00:00 2001 From: Benjamin Benoist Date: Wed, 17 Jan 2024 12:58:54 +0100 Subject: [PATCH 2/2] Switch from Blaze client to Ember client (close #853) --- .../enrich/common/fs2/Environment.scala | 36 +++++++++------- .../enrich/common/fs2/io/Clients.scala | 38 ++++++----------- .../test/resources/simplelogger.properties | 2 + .../enrich/common/fs2/AssetsSpec.scala | 6 +-- .../ApiRequestEnrichmentSpec.scala | 42 +++++++++++++++++-- .../common/fs2/test/TestEnvironment.scala | 15 ++++--- .../enrichments/EnrichmentRegistry.scala | 4 +- .../common/utils/HttpClient.scala | 26 +++++++++--- .../SpecHelpers.scala | 6 +-- project/Dependencies.scala | 2 +- 10 files changed, 112 insertions(+), 65 deletions(-) diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala index f7f8199ee..83f7a2448 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/Environment.scala @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2 import java.util.concurrent.TimeoutException import scala.concurrent.ExecutionContext -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import cats.Show import cats.data.EitherT @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf +import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} @@ -139,35 +140,41 @@ object Environment { final case class Enrichments[F[_]: Async]( registry: EnrichmentRegistry[F], configs: List[EnrichmentConf], - httpClient: HttpClient[F] + httpApiEnrichment: HttpClient[F] ) { /** Initialize same enrichments, specified by configs (in case DB files updated) */ def reinitialize(blockingEC: ExecutionContext, shifter: ShiftExecution[F]): F[Enrichments[F]] = - Enrichments.buildRegistry(configs, blockingEC, shifter, httpClient).map(registry => Enrichments(registry, configs, httpClient)) + Enrichments + .buildRegistry(configs, blockingEC, shifter, httpApiEnrichment) + .map(registry => Enrichments(registry, configs, httpApiEnrichment)) } object Enrichments { def make[F[_]: Async]( configs: List[EnrichmentConf], blockingEC: ExecutionContext, - shifter: ShiftExecution[F], - httpClient: HttpClient[F] + shifter: ShiftExecution[F] ): Resource[F, Ref[F, Enrichments[F]]] = - Resource.eval { - for { - registry <- buildRegistry[F](configs, blockingEC, shifter, httpClient) - ref <- Ref.of(Enrichments[F](registry, configs, httpClient)) - } yield ref - } + for { + // We don't want the HTTP client of API enrichment to be reinitialized each time the assets are refreshed + httpClient <- configs.collectFirst { case api: ApiRequestConf => api.api.timeout } match { + case Some(timeout) => + Clients.mkHttp(readTimeout = timeout.millis).map(HttpClient.fromHttp4sClient[F]) + case None => + Resource.pure[F, HttpClient[F]](HttpClient.noop[F]) + } + registry <- Resource.eval(buildRegistry[F](configs, blockingEC, shifter, httpClient)) + ref <- Resource.eval(Ref.of(Enrichments[F](registry, configs, httpClient))) + } yield ref def buildRegistry[F[_]: Async]( configs: List[EnrichmentConf], blockingEC: ExecutionContext, shifter: ShiftExecution[F], - httpClient: HttpClient[F] + httpApiEnrichment: HttpClient[F] ) = - EnrichmentRegistry.build[F](configs, shifter, httpClient, blockingEC).value.flatMap { + EnrichmentRegistry.build[F](configs, shifter, httpApiEnrichment, blockingEC).value.flatMap { case Right(reg) => Async[F].pure(reg) case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error)) } @@ -198,7 +205,6 @@ object Environment { bad <- sinkBad pii <- sinkPii.sequence http4s <- Clients.mkHttp() - http = HttpClient.fromHttp4sClient(http4s) clts <- clients.map(Clients.init(http4s, _)) igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty @@ -210,7 +216,7 @@ object Environment { sem <- Resource.eval(Semaphore(1L)) assetsState <- Resource.eval(Assets.State.make[F](sem, clts, assets)) shifter <- ShiftExecution.ofSingleThread[F] - enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter, http) + enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter) } yield Environment[F, A]( igluClient, Http4sRegistryLookup(http4s), diff --git a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala index 9161990fa..8d14cbf06 100644 --- a/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala +++ b/modules/common-fs2/src/main/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/io/Clients.scala @@ -25,9 +25,8 @@ import fs2.Stream import org.http4s.{Headers, Request, Uri} import org.http4s.client.defaults import org.http4s.client.{Client => Http4sClient} -import org.http4s.client.middleware.{Retry, RetryPolicy} -import org.http4s.blaze.client.BlazeClientBuilder -import org.http4s.blaze.pipeline.Command +import org.http4s.client.middleware.Retry +import org.http4s.ember.client.EmberClientBuilder import Clients._ @@ -67,29 +66,16 @@ object Clients { def mkHttp[F[_]: Async]( connectionTimeout: FiniteDuration = defaults.ConnectTimeout, readTimeout: FiniteDuration = defaults.RequestTimeout, - maxConnections: Int = 10 // http4s uses 10 by default - ): Resource[F, Http4sClient[F]] = - BlazeClientBuilder[F] - .withConnectTimeout(connectionTimeout) - .withRequestTimeout(readTimeout) - .withMaxTotalConnections(maxConnections) - .resource - .map(Retry[F](retryPolicy, redactHeadersWhen)) - - private def retryPolicy[F[_]] = - RetryPolicy[F]( - backoff, - retriable = { - //EOF error has to be retried explicitly for blaze client, see https://github.com/snowplow/enrich/issues/692 - case (_, Left(Command.EOF)) => true - case _ => false - } - ) - - //retry once after 100 mills - private def backoff(attemptNumber: Int): Option[FiniteDuration] = - if (attemptNumber > 1) None - else Some(100.millis) + maxConnections: Int = 100 // default of Ember client + ): Resource[F, Http4sClient[F]] = { + val builder = EmberClientBuilder + .default[F] + .withTimeout(readTimeout) + .withIdleConnectionTime(connectionTimeout) + .withMaxTotal(maxConnections) + val retryPolicy = builder.retryPolicy + builder.build.map(Retry[F](retryPolicy, redactHeadersWhen)) + } private def redactHeadersWhen(header: CIString) = (Headers.SensitiveHeaders + CIString("apikey")).contains(header) diff --git a/modules/common-fs2/src/test/resources/simplelogger.properties b/modules/common-fs2/src/test/resources/simplelogger.properties index 465d9d8e3..8475581c7 100644 --- a/modules/common-fs2/src/test/resources/simplelogger.properties +++ b/modules/common-fs2/src/test/resources/simplelogger.properties @@ -11,3 +11,5 @@ org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.Enri org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.Assets=warn org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment=info org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.test.HttpServer=info + +org.slf4j.simpleLogger.log.org.http4s.client.middleware.Retry=error diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AssetsSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AssetsSpec.scala index 661ab04db..56f32e381 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AssetsSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/AssetsSpec.scala @@ -29,7 +29,7 @@ import cats.effect.unsafe.implicits.global import cats.effect.testing.specs2.CatsEffect -import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} +import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients @@ -116,9 +116,7 @@ class AssetsSpec extends Specification with CatsEffect with ScalaCheck { for { shiftExecution <- ShiftExecution.ofSingleThread[IO] sem <- Resource.eval(Semaphore[IO](1L)) - http4s <- Clients.mkHttp[IO]() - http = HttpClient.fromHttp4sClient(http4s) - enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution, http) + enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution) _ <- SpecHelpers.filesResource(TestFiles) } yield (shiftExecution, sem, enrichments) diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/enrichments/ApiRequestEnrichmentSpec.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/enrichments/ApiRequestEnrichmentSpec.scala index 53396ef3a..9e26d16c5 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/enrichments/ApiRequestEnrichmentSpec.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/enrichments/ApiRequestEnrichmentSpec.scala @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ import com.snowplowanalytics.snowplow.enrich.common.fs2.enrichments.ApiRequestEnrichmentSpec.unstructEvent import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec import com.snowplowanalytics.snowplow.enrich.common.fs2.test._ +import com.snowplowanalytics.snowplow.badrows.BadRow class ApiRequestEnrichmentSpec extends Specification with CatsEffect { @@ -84,9 +85,44 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect { testWithHttp.use { test => test.run().map { case (bad, pii, good) => - (bad must be empty) - (pii must be empty) - (good.map(_.derived_contexts) must contain(exactly(expected))) + bad must beEmpty + pii must beEmpty + good.map(_.derived_contexts) must contain(exactly(expected)) + } + } + } + + "generate bad rows if API server is not available" in { + val nbEvents = 1000 + val input = Stream((1 to nbEvents).toList: _*) + .map { i => + json"""{ + "schema": "iglu:com.acme/test/jsonschema/1-0-1", + "data": {"path": {"id": $i}} + }""" + } + .map { ue => + EnrichSpec.collectorPayload.copy( + querystring = new BasicNameValuePair("ue_px", unstructEvent(ue)) :: EnrichSpec.collectorPayload.querystring + ) + } + .map(_.toRaw) + + val enrichment = ApiRequestConf( + SchemaKey("com.acme", "enrichment", "jsonschema", SchemaVer.Full(1, 0, 0)), + List(Input.Json("key1", "unstruct_event", SchemaCriterion("com.acme", "test", "jsonschema", 1), "$.path.id")), + HttpApi("GET", "http://foo.bar.unassigned/{{key1}}", 2000, Authentication(None)), + List(RegistryOutput("iglu:com.acme/output/jsonschema/1-0-0", Some(JsonOutput("$")))), + Cache(1, 1000), + ignoreOnError = false + ) + + TestEnvironment.make(input, List(enrichment)).use { test => + test.run().map { + case (bad, pii, good) => + good must beEmpty + pii must beEmpty + bad.collect { case ef: BadRow.EnrichmentFailures => ef } must haveSize(nbEvents) } } } diff --git a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala index 5ceda5a07..8187f70d0 100644 --- a/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala +++ b/modules/common-fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/common/fs2/test/TestEnvironment.scala @@ -34,6 +34,8 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLook import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.iglu.core.SelfDescribingData + import com.snowplowanalytics.snowplow.badrows.BadRow import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas @@ -41,7 +43,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf -import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution} +import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment} import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings} @@ -112,7 +114,6 @@ object TestEnvironment extends CatsEffect { def make(source: Stream[IO, Array[Byte]], enrichments: List[EnrichmentConf] = Nil): Resource[IO, TestEnvironment[Array[Byte]]] = for { http4s <- Clients.mkHttp[IO]() - http = HttpClient.fromHttp4sClient(http4s) _ <- SpecHelpers.filesResource(enrichments.flatMap(_.filesToCache).map(p => Path(p._2))) counter <- Resource.eval(Counter.make[IO]) metrics = Counter.mkCounterMetrics[IO](counter) @@ -122,7 +123,7 @@ object TestEnvironment extends CatsEffect { sem <- Resource.eval(Semaphore[IO](1L)) assetsState <- Resource.eval(Assets.State.make(sem, clients, enrichments.flatMap(_.filesToCache))) shifter <- ShiftExecution.ofSingleThread[IO] - enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter, http) + enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter) goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty)) badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty)) @@ -168,7 +169,11 @@ object TestEnvironment extends CatsEffect { .parse(badRowStr) .getOrElse(throw new RuntimeException(s"Error parsing bad row json: $badRowStr")) parsed - .as[BadRow] - .getOrElse(throw new RuntimeException(s"Error decoding bad row: $parsed")) + .as[SelfDescribingData[BadRow]] match { + case Left(e) => + throw new RuntimeException(s"Error decoding bad row $parsed", e) + case Right(sdj) => + sdj.data + } } } diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala index beec67e31..67bb1bbad 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentRegistry.scala @@ -109,14 +109,14 @@ object EnrichmentRegistry { def build[F[_]: Async]( confs: List[EnrichmentConf], shifter: ShiftExecution[F], - httpClient: HttpClient[F], + httpApiEnrichment: HttpClient[F], blockingEC: ExecutionContext ): EitherT[F, String, EnrichmentRegistry[F]] = confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) => e match { case c: ApiRequestConf => for { - enrichment <- EitherT.right(c.enrichment[F](httpClient)) + enrichment <- EitherT.right(c.enrichment[F](httpApiEnrichment)) registry <- er } yield registry.copy(apiRequest = enrichment.some) case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some)) diff --git a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/HttpClient.scala b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/HttpClient.scala index 449f68361..302d68ee9 100644 --- a/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/HttpClient.scala +++ b/modules/common/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/utils/HttpClient.scala @@ -14,11 +14,10 @@ package com.snowplowanalytics.snowplow.enrich.common.utils import cats.effect.kernel.Sync import cats.implicits._ -import fs2.Stream import org.typelevel.ci.CIString import org.http4s.client.{Client => Http4sClient} -import org.http4s.headers.Authorization -import org.http4s.{BasicCredentials, EmptyBody, EntityBody, Header, Headers, Method, Request, Status, Uri} +import org.http4s.headers.{Authorization, `Content-Type`} +import org.http4s.{BasicCredentials, Header, Headers, MediaType, Method, Request, Status, Uri} trait HttpClient[F[_]] { def getResponse( @@ -58,14 +57,17 @@ object HttpClient { case Left(parseFailure) => Sync[F].pure(new IllegalArgumentException(s"uri [$uri] is not valid: ${parseFailure.sanitized}").asLeft[String]) case Right(validUri) => - val request = Request[F]( + val requestWithoutBody = Request[F]( uri = validUri, method = Method.fromString(method).getOrElse(Method.GET), - body = body.fold[EntityBody[F]](EmptyBody)(s => Stream.emits(s.getBytes)), headers = getHeaders(authUser, authPassword) ) + val requestWithBody = body.fold(requestWithoutBody)(s => + requestWithoutBody.withEntity(s).withContentType(`Content-Type`(MediaType.application.json)) + ) + http4sClient - .run(request) + .run(requestWithBody) .use[Either[Throwable, String]] { response => val body = response.bodyText.compile.string response.status.responseClass match { @@ -76,4 +78,16 @@ object HttpClient { .handleError(_.asLeft[String]) } } + + def noop[F[_]: Sync]: HttpClient[F] = + new HttpClient[F] { + override def getResponse( + uri: String, + authUser: Option[String], + authPassword: Option[String], + body: Option[String], + method: String + ): F[Either[Throwable, String]] = + Sync[F].raiseError(new IllegalStateException("HTTP client not implemented")) + } } diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala index 6f2b91cbb..5614b22f6 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/SpecHelpers.scala @@ -23,7 +23,7 @@ import cats.effect.IO import cats.effect.kernel.Resource import cats.effect.unsafe.implicits.global -import org.http4s.blaze.client.BlazeClientBuilder +import org.http4s.ember.client.EmberClientBuilder import cats.effect.testing.specs2.CatsEffect @@ -91,8 +91,8 @@ object SpecHelpers extends CatsEffect { val registryLookup = JavaNetRegistryLookup.ioLookupInstance[IO] val blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool) - private val http4sClient = BlazeClientBuilder[IO].resource - val httpClient = http4sClient.map(HttpClient.fromHttp4sClient[IO]) + + val httpClient = EmberClientBuilder.default[IO].build.map(HttpClient.fromHttp4sClient[IO]) private type NvPair = (String, String) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ffb5f23fe..90744e33d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -194,7 +194,7 @@ object Dependencies { .exclude("software.amazon.glue", "schema-registry-serde") val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2 % Runtime val azureIdentity = "com.azure" % "azure-identity" % V.azureIdentity - val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s + val http4sClient = "org.http4s" %% "http4s-ember-client" % V.http4s val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry