Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from Blaze client to Ember client #854

Merged
merged 2 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import io.circe.parser

import com.snowplowanalytics.snowplow.badrows.BadRow
import com.snowplowanalytics.iglu.core.SelfDescribingData
import com.snowplowanalytics.iglu.core.circe.implicits._

object Utils {

def parseBadRow(s: String): Either[String, BadRow] =
for {
json <- parser.parse(s).leftMap(_.message)
sdj <- SelfDescribingData.parse(json).leftMap(_.message("Can't decode JSON as SDJ"))
br <- sdj.data.as[BadRow].leftMap(_.getMessage())
} yield br
sdj <- json.as[SelfDescribingData[BadRow]].leftMap(_.message)
} yield sdj.data
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2
import java.util.concurrent.TimeoutException

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

import cats.Show
import cats.data.EitherT
Expand Down Expand Up @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf.ApiRequestConf
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}

Expand Down Expand Up @@ -139,35 +140,41 @@ object Environment {
final case class Enrichments[F[_]: Async](
registry: EnrichmentRegistry[F],
configs: List[EnrichmentConf],
httpClient: HttpClient[F]
httpApiEnrichment: HttpClient[F]
) {

/** Initialize same enrichments, specified by configs (in case DB files updated) */
def reinitialize(blockingEC: ExecutionContext, shifter: ShiftExecution[F]): F[Enrichments[F]] =
Enrichments.buildRegistry(configs, blockingEC, shifter, httpClient).map(registry => Enrichments(registry, configs, httpClient))
Enrichments
.buildRegistry(configs, blockingEC, shifter, httpApiEnrichment)
.map(registry => Enrichments(registry, configs, httpApiEnrichment))
}

object Enrichments {
def make[F[_]: Async](
configs: List[EnrichmentConf],
blockingEC: ExecutionContext,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
shifter: ShiftExecution[F]
): Resource[F, Ref[F, Enrichments[F]]] =
Resource.eval {
for {
registry <- buildRegistry[F](configs, blockingEC, shifter, httpClient)
ref <- Ref.of(Enrichments[F](registry, configs, httpClient))
} yield ref
}
for {
// We don't want the HTTP client of API enrichment to be reinitialized each time the assets are refreshed
httpClient <- configs.collectFirst { case api: ApiRequestConf => api.api.timeout } match {
case Some(timeout) =>
Clients.mkHttp(readTimeout = timeout.millis).map(HttpClient.fromHttp4sClient[F])
case None =>
Resource.pure[F, HttpClient[F]](HttpClient.noop[F])
}
registry <- Resource.eval(buildRegistry[F](configs, blockingEC, shifter, httpClient))
ref <- Resource.eval(Ref.of(Enrichments[F](registry, configs, httpClient)))
} yield ref

def buildRegistry[F[_]: Async](
configs: List[EnrichmentConf],
blockingEC: ExecutionContext,
shifter: ShiftExecution[F],
httpClient: HttpClient[F]
httpApiEnrichment: HttpClient[F]
) =
EnrichmentRegistry.build[F](configs, shifter, httpClient, blockingEC).value.flatMap {
EnrichmentRegistry.build[F](configs, shifter, httpApiEnrichment, blockingEC).value.flatMap {
case Right(reg) => Async[F].pure(reg)
case Left(error) => Async[F].raiseError[EnrichmentRegistry[F]](new RuntimeException(error))
}
Expand Down Expand Up @@ -198,7 +205,6 @@ object Environment {
bad <- sinkBad
pii <- sinkPii.sequence
http4s <- Clients.mkHttp()
http = HttpClient.fromHttp4sClient(http4s)
clts <- clients.map(Clients.init(http4s, _))
igluClient <- IgluCirceClient.parseDefault[F](parsedConfigs.igluJson).resource
remoteAdaptersEnabled = file.remoteAdapters.configs.nonEmpty
Expand All @@ -210,7 +216,7 @@ object Environment {
sem <- Resource.eval(Semaphore(1L))
assetsState <- Resource.eval(Assets.State.make[F](sem, clts, assets))
shifter <- ShiftExecution.ofSingleThread[F]
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter, http)
enrichments <- Enrichments.make[F](parsedConfigs.enrichmentConfigs, blockingEC, shifter)
} yield Environment[F, A](
igluClient,
Http4sRegistryLookup(http4s),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import fs2.Stream
import org.http4s.{Headers, Request, Uri}
import org.http4s.client.defaults
import org.http4s.client.{Client => Http4sClient}
import org.http4s.client.middleware.{Retry, RetryPolicy}
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.pipeline.Command
import org.http4s.client.middleware.Retry
import org.http4s.ember.client.EmberClientBuilder

import Clients._

Expand Down Expand Up @@ -67,29 +66,16 @@ object Clients {
def mkHttp[F[_]: Async](
connectionTimeout: FiniteDuration = defaults.ConnectTimeout,
readTimeout: FiniteDuration = defaults.RequestTimeout,
maxConnections: Int = 10 // http4s uses 10 by default
): Resource[F, Http4sClient[F]] =
BlazeClientBuilder[F]
.withConnectTimeout(connectionTimeout)
.withRequestTimeout(readTimeout)
.withMaxTotalConnections(maxConnections)
.resource
.map(Retry[F](retryPolicy, redactHeadersWhen))

private def retryPolicy[F[_]] =
RetryPolicy[F](
backoff,
retriable = {
//EOF error has to be retried explicitly for blaze client, see https://github.com/snowplow/enrich/issues/692
case (_, Left(Command.EOF)) => true
case _ => false
}
)

//retry once after 100 mills
private def backoff(attemptNumber: Int): Option[FiniteDuration] =
if (attemptNumber > 1) None
else Some(100.millis)
maxConnections: Int = 100 // default of Ember client
): Resource[F, Http4sClient[F]] = {
val builder = EmberClientBuilder
.default[F]
.withTimeout(readTimeout)
.withIdleConnectionTime(connectionTimeout)
.withMaxTotal(maxConnections)
val retryPolicy = builder.retryPolicy
builder.build.map(Retry[F](retryPolicy, redactHeadersWhen))
}

private def redactHeadersWhen(header: CIString) =
(Headers.SensitiveHeaders + CIString("apikey")).contains(header)
Expand Down
2 changes: 2 additions & 0 deletions modules/common-fs2/src/test/resources/simplelogger.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.Enri
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.Assets=warn
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.test.TestEnvironment=info
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.common.fs2.test.HttpServer=info

org.slf4j.simpleLogger.log.org.http4s.client.middleware.Retry=error
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import cats.effect.unsafe.implicits.global

import cats.effect.testing.specs2.CatsEffect

import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution

import com.snowplowanalytics.snowplow.enrich.common.fs2.io.Clients

Expand Down Expand Up @@ -116,9 +116,7 @@ class AssetsSpec extends Specification with CatsEffect with ScalaCheck {
for {
shiftExecution <- ShiftExecution.ofSingleThread[IO]
sem <- Resource.eval(Semaphore[IO](1L))
http4s <- Clients.mkHttp[IO]()
http = HttpClient.fromHttp4sClient(http4s)
enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution, http)
enrichments <- Environment.Enrichments.make[IO](List(), SpecHelpers.blockingEC, shiftExecution)
_ <- SpecHelpers.filesResource(TestFiles)
} yield (shiftExecution, sem, enrichments)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.apirequ
import com.snowplowanalytics.snowplow.enrich.common.fs2.enrichments.ApiRequestEnrichmentSpec.unstructEvent
import com.snowplowanalytics.snowplow.enrich.common.fs2.EnrichSpec
import com.snowplowanalytics.snowplow.enrich.common.fs2.test._
import com.snowplowanalytics.snowplow.badrows.BadRow

class ApiRequestEnrichmentSpec extends Specification with CatsEffect {

Expand Down Expand Up @@ -84,9 +85,44 @@ class ApiRequestEnrichmentSpec extends Specification with CatsEffect {
testWithHttp.use { test =>
test.run().map {
case (bad, pii, good) =>
(bad must be empty)
(pii must be empty)
(good.map(_.derived_contexts) must contain(exactly(expected)))
bad must beEmpty
pii must beEmpty
good.map(_.derived_contexts) must contain(exactly(expected))
}
}
}

"generate bad rows if API server is not available" in {
val nbEvents = 1000
val input = Stream((1 to nbEvents).toList: _*)
.map { i =>
json"""{
"schema": "iglu:com.acme/test/jsonschema/1-0-1",
"data": {"path": {"id": $i}}
}"""
}
.map { ue =>
EnrichSpec.collectorPayload.copy(
querystring = new BasicNameValuePair("ue_px", unstructEvent(ue)) :: EnrichSpec.collectorPayload.querystring
)
}
.map(_.toRaw)

val enrichment = ApiRequestConf(
SchemaKey("com.acme", "enrichment", "jsonschema", SchemaVer.Full(1, 0, 0)),
List(Input.Json("key1", "unstruct_event", SchemaCriterion("com.acme", "test", "jsonschema", 1), "$.path.id")),
HttpApi("GET", "http://foo.bar.unassigned/{{key1}}", 2000, Authentication(None)),
List(RegistryOutput("iglu:com.acme/output/jsonschema/1-0-0", Some(JsonOutput("$")))),
Cache(1, 1000),
ignoreOnError = false
)

TestEnvironment.make(input, List(enrichment)).use { test =>
test.run().map {
case (bad, pii, good) =>
good must beEmpty
pii must beEmpty
bad.collect { case ef: BadRow.EnrichmentFailures => ef } must haveSize(nbEvents)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import com.snowplowanalytics.iglu.client.resolver.registries.{Http4sRegistryLook

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event

import com.snowplowanalytics.iglu.core.SelfDescribingData

import com.snowplowanalytics.snowplow.badrows.BadRow

import com.snowplowanalytics.snowplow.enrich.common.SpecHelpers.adaptersSchemas
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.adapters.registry.RemoteAdapter
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.EnrichmentConf
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
import com.snowplowanalytics.snowplow.enrich.common.utils.ShiftExecution

import com.snowplowanalytics.snowplow.enrich.common.fs2.{Assets, AttributedData, Enrich, EnrichSpec, Environment}
import com.snowplowanalytics.snowplow.enrich.common.fs2.Environment.{Enrichments, StreamsSettings}
Expand Down Expand Up @@ -112,7 +114,6 @@ object TestEnvironment extends CatsEffect {
def make(source: Stream[IO, Array[Byte]], enrichments: List[EnrichmentConf] = Nil): Resource[IO, TestEnvironment[Array[Byte]]] =
for {
http4s <- Clients.mkHttp[IO]()
http = HttpClient.fromHttp4sClient(http4s)
_ <- SpecHelpers.filesResource(enrichments.flatMap(_.filesToCache).map(p => Path(p._2)))
counter <- Resource.eval(Counter.make[IO])
metrics = Counter.mkCounterMetrics[IO](counter)
Expand All @@ -122,7 +123,7 @@ object TestEnvironment extends CatsEffect {
sem <- Resource.eval(Semaphore[IO](1L))
assetsState <- Resource.eval(Assets.State.make(sem, clients, enrichments.flatMap(_.filesToCache)))
shifter <- ShiftExecution.ofSingleThread[IO]
enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter, http)
enrichmentsRef <- Enrichments.make[IO](enrichments, SpecHelpers.blockingEC, shifter)
goodRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
piiRef <- Resource.eval(Ref.of[IO, Vector[AttributedData[Array[Byte]]]](Vector.empty))
badRef <- Resource.eval(Ref.of[IO, Vector[Array[Byte]]](Vector.empty))
Expand Down Expand Up @@ -168,7 +169,11 @@ object TestEnvironment extends CatsEffect {
.parse(badRowStr)
.getOrElse(throw new RuntimeException(s"Error parsing bad row json: $badRowStr"))
parsed
.as[BadRow]
.getOrElse(throw new RuntimeException(s"Error decoding bad row: $parsed"))
.as[SelfDescribingData[BadRow]] match {
case Left(e) =>
throw new RuntimeException(s"Error decoding bad row $parsed", e)
case Right(sdj) =>
sdj.data
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ object EnrichmentRegistry {
def build[F[_]: Async](
confs: List[EnrichmentConf],
shifter: ShiftExecution[F],
httpClient: HttpClient[F],
httpApiEnrichment: HttpClient[F],
blockingEC: ExecutionContext
): EitherT[F, String, EnrichmentRegistry[F]] =
confs.foldLeft(EitherT.pure[F, String](EnrichmentRegistry[F]())) { (er, e) =>
e match {
case c: ApiRequestConf =>
for {
enrichment <- EitherT.right(c.enrichment[F](httpClient))
enrichment <- EitherT.right(c.enrichment[F](httpApiEnrichment))
registry <- er
} yield registry.copy(apiRequest = enrichment.some)
case c: PiiPseudonymizerConf => er.map(_.copy(piiPseudonymizer = c.enrichment.some))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ package com.snowplowanalytics.snowplow.enrich.common.utils

import cats.effect.kernel.Sync
import cats.implicits._
import fs2.Stream
import org.typelevel.ci.CIString
import org.http4s.client.{Client => Http4sClient}
import org.http4s.headers.Authorization
import org.http4s.{BasicCredentials, EmptyBody, EntityBody, Header, Headers, Method, Request, Status, Uri}
import org.http4s.headers.{Authorization, `Content-Type`}
import org.http4s.{BasicCredentials, Header, Headers, MediaType, Method, Request, Status, Uri}

trait HttpClient[F[_]] {
def getResponse(
Expand Down Expand Up @@ -58,14 +57,17 @@ object HttpClient {
case Left(parseFailure) =>
Sync[F].pure(new IllegalArgumentException(s"uri [$uri] is not valid: ${parseFailure.sanitized}").asLeft[String])
case Right(validUri) =>
val request = Request[F](
val requestWithoutBody = Request[F](
uri = validUri,
method = Method.fromString(method).getOrElse(Method.GET),
body = body.fold[EntityBody[F]](EmptyBody)(s => Stream.emits(s.getBytes)),
headers = getHeaders(authUser, authPassword)
)
val requestWithBody = body.fold(requestWithoutBody)(s =>
requestWithoutBody.withEntity(s).withContentType(`Content-Type`(MediaType.application.json))
)

http4sClient
.run(request)
.run(requestWithBody)
.use[Either[Throwable, String]] { response =>
val body = response.bodyText.compile.string
response.status.responseClass match {
Expand All @@ -76,4 +78,16 @@ object HttpClient {
.handleError(_.asLeft[String])
}
}

def noop[F[_]: Sync]: HttpClient[F] =
new HttpClient[F] {
override def getResponse(
uri: String,
authUser: Option[String],
authPassword: Option[String],
body: Option[String],
method: String
): F[Either[Throwable, String]] =
Sync[F].raiseError(new IllegalStateException("HTTP client not implemented"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.unsafe.implicits.global

import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.ember.client.EmberClientBuilder

import cats.effect.testing.specs2.CatsEffect

Expand Down Expand Up @@ -91,8 +91,8 @@ object SpecHelpers extends CatsEffect {
val registryLookup = JavaNetRegistryLookup.ioLookupInstance[IO]

val blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool)
private val http4sClient = BlazeClientBuilder[IO].resource
val httpClient = http4sClient.map(HttpClient.fromHttp4sClient[IO])

val httpClient = EmberClientBuilder.default[IO].build.map(HttpClient.fromHttp4sClient[IO])

private type NvPair = (String, String)

Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object Dependencies {
val scalaWeather = "2.0.0"
val gatlingJsonpath = "0.6.14"
val scalaUri = "1.5.1"
val badRows = "2.2.1"
val badRows = "2.3.0"
val igluClient = "3.1.0"

val snowplowRawEvent = "0.1.0"
Expand Down Expand Up @@ -194,7 +194,7 @@ object Dependencies {
.exclude("software.amazon.glue", "schema-registry-serde")
val stsSdk2 = "software.amazon.awssdk" % "sts" % V.awsSdk2 % Runtime
val azureIdentity = "com.azure" % "azure-identity" % V.azureIdentity
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s
val http4sClient = "org.http4s" %% "http4s-ember-client" % V.http4s
val http4sCirce = "org.http4s" %% "http4s-circe" % V.http4s
val log4cats = "org.typelevel" %% "log4cats-slf4j" % V.log4cats
val catsRetry = "com.github.cb372" %% "cats-retry" % V.catsRetry
Expand Down
Loading