diff --git a/build.sbt b/build.sbt index 0fa9473ba..6d515f06f 100644 --- a/build.sbt +++ b/build.sbt @@ -26,6 +26,8 @@ lazy val root = project jvm, jvmMicrometer, jvmPureConfig, + lettuce, + lettucePureConfig, micrometer, micrometerJmx, micrometerJmxPureConfig, @@ -323,6 +325,23 @@ lazy val jvmPureConfig = project libraryDependencies += Dependencies.pureConfig ) +lazy val lettuce = project + .in(file("lettuce")) + .settings(BuildSettings.common) + .settings( + name := "sst-lettuce", + libraryDependencies += Dependencies.lettuce + ) + +lazy val lettucePureConfig = project + .in(file("lettuce-pureconfig")) + .dependsOn(lettuce) + .settings(BuildSettings.common) + .settings( + name := "sst-lettuce-pureconfig", + libraryDependencies += Dependencies.pureConfig + ) + lazy val micrometer = project .in(file("micrometer")) .settings(BuildSettings.common) diff --git a/lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/ConfigReaders.scala b/lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/ConfigReaders.scala new file mode 100644 index 000000000..ce2709dd5 --- /dev/null +++ b/lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/ConfigReaders.scala @@ -0,0 +1,54 @@ +package com.avast.sst.lettuce.pureconfig + +import java.nio.charset.Charset + +import cats.syntax.either._ +import com.avast.sst.lettuce.LettuceConfig +import com.avast.sst.lettuce.LettuceConfig.{SocketOptions, SslOptions, TimeoutOptions} +import io.lettuce.core.ClientOptions.DisconnectedBehavior +import io.lettuce.core.protocol.ProtocolVersion +import pureconfig.ConfigReader +import pureconfig.error.CannotConvert +import pureconfig.generic.ProductHint +import pureconfig.generic.semiauto.deriveReader + +trait ConfigReaders { + + implicit protected def hint[T]: ProductHint[T] = ProductHint.default + + implicit val lettuceDisconnectedBehaviorConfigReader: ConfigReader[DisconnectedBehavior] = ConfigReader.stringConfigReader.emap { + case "DEFAULT" => DisconnectedBehavior.DEFAULT.asRight + case "ACCEPT_COMMANDS" => DisconnectedBehavior.ACCEPT_COMMANDS.asRight + case "REJECT_COMMANDS" => DisconnectedBehavior.REJECT_COMMANDS.asRight + case unknown => + CannotConvert( + unknown, + "DisconnectedBehavior", + s"Unknown enum value: ${DisconnectedBehavior.values().map(_.name()).mkString("|")}" + ).asLeft + } + + implicit val lettuceProtocolVersionConfigReader: ConfigReader[ProtocolVersion] = ConfigReader.stringConfigReader.emap { + case "RESP2" => ProtocolVersion.RESP2.asRight + case "RESP3" => ProtocolVersion.RESP3.asRight + case unknown => + CannotConvert( + unknown, + "ProtocolVersion", + s"Unknown enum value: ${ProtocolVersion.values().map(_.name()).mkString("|")}" + ).asLeft + } + + implicit val lettuceCharsetConfigReader: ConfigReader[Charset] = ConfigReader.stringConfigReader.emap { charset => + Either.catchNonFatal(Charset.forName(charset)).leftMap(ex => CannotConvert(charset, "java.nio.Charset", ex.getMessage)) + } + + implicit val lettuceSocketOptionsReader: ConfigReader[SocketOptions] = deriveReader + + implicit val lettuceSslOptionsReader: ConfigReader[SslOptions] = deriveReader + + implicit val lettuceTimeoutOptionsReader: ConfigReader[TimeoutOptions] = deriveReader + + implicit val lettuceConfigReader: ConfigReader[LettuceConfig] = deriveReader + +} diff --git a/lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/implicits.scala b/lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/implicits.scala new file mode 100644 index 000000000..6b655f1e4 --- /dev/null +++ b/lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/implicits.scala @@ -0,0 +1,20 @@ +package com.avast.sst.lettuce.pureconfig + +import pureconfig.ConfigFieldMapping +import pureconfig.generic.ProductHint + +/** Contains [[pureconfig.ConfigReader]] instances with default "kebab-case" naming convention. */ +object implicits extends ConfigReaders { + + /** Contains [[pureconfig.ConfigReader]] instances with "kebab-case" naming convention. + * + * This is alias for the default `implicits._` import. + */ + object KebabCase extends ConfigReaders + + /** Contains [[pureconfig.ConfigReader]] instances with "camelCase" naming convention. */ + object CamelCase extends ConfigReaders { + implicit override protected def hint[T]: ProductHint[T] = ProductHint(ConfigFieldMapping(pureconfig.CamelCase, pureconfig.CamelCase)) + } + +} diff --git a/lettuce/src/main/scala/com/avast/sst/lettuce/LettuceConfig.scala b/lettuce/src/main/scala/com/avast/sst/lettuce/LettuceConfig.scala new file mode 100644 index 000000000..f7fef61ca --- /dev/null +++ b/lettuce/src/main/scala/com/avast/sst/lettuce/LettuceConfig.scala @@ -0,0 +1,46 @@ +package com.avast.sst.lettuce + +import java.nio.charset.Charset + +import com.avast.sst.lettuce.LettuceConfig.{SocketOptions, SslOptions, TimeoutOptions} +import io.lettuce.core.ClientOptions.DisconnectedBehavior +import io.lettuce.core.protocol.ProtocolVersion +import io.lettuce.core.{ClientOptions, SocketOptions => LettuceSocketOptions, TimeoutOptions => LettuceTimeoutOptions} + +import scala.concurrent.duration.Duration + +final case class LettuceConfig( + uri: String, + pingBeforeActivateConnection: Boolean = ClientOptions.DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION, + autoReconnect: Boolean = ClientOptions.DEFAULT_AUTO_RECONNECT, + cancelCommandsOnReconnectFailure: Boolean = ClientOptions.DEFAULT_CANCEL_CMD_RECONNECT_FAIL, + suspendReconnectOnProtocolFailure: Boolean = ClientOptions.DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL, + requestQueueSize: Int = ClientOptions.DEFAULT_REQUEST_QUEUE_SIZE, + disconnectedBehavior: DisconnectedBehavior = DisconnectedBehavior.DEFAULT, + protocolVersion: Option[ProtocolVersion] = None, + scriptCharset: Charset = ClientOptions.DEFAULT_SCRIPT_CHARSET, + publishOnScheduler: Boolean = ClientOptions.DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL, + socketOptions: SocketOptions = SocketOptions(), + sslOptions: SslOptions = SslOptions(), + timeoutOptions: TimeoutOptions = TimeoutOptions() +) + +object LettuceConfig { + + final case class SocketOptions( + connectTimeout: Duration = Duration.fromNanos(LettuceSocketOptions.DEFAULT_CONNECT_TIMEOUT_DURATION.toNanos), + keepAlive: Boolean = LettuceSocketOptions.DEFAULT_SO_KEEPALIVE, + tcpNoDelay: Boolean = LettuceSocketOptions.DEFAULT_SO_NO_DELAY + ) + + final case class SslOptions( + keyStoreType: Option[String] = None, + keyStorePath: Option[String] = None, + keyStorePassword: Option[String] = None, + trustStorePath: Option[String] = None, + trustStorePassword: Option[String] = None + ) + + final case class TimeoutOptions(timeoutCommands: Boolean = LettuceTimeoutOptions.DEFAULT_TIMEOUT_COMMANDS) + +} diff --git a/lettuce/src/main/scala/com/avast/sst/lettuce/LettuceModule.scala b/lettuce/src/main/scala/com/avast/sst/lettuce/LettuceModule.scala new file mode 100644 index 000000000..88b52bfcb --- /dev/null +++ b/lettuce/src/main/scala/com/avast/sst/lettuce/LettuceModule.scala @@ -0,0 +1,95 @@ +package com.avast.sst.lettuce + +import java.io.File +import java.time.Duration + +import cats.effect.{Async, Resource, Sync} +import cats.syntax.either._ +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.codec.RedisCodec +import io.lettuce.core.resource.ClientResources +import io.lettuce.core.{ClientOptions, RedisClient, RedisURI, SocketOptions, SslOptions, TimeoutOptions} + +object LettuceModule { + + /** Makes [[io.lettuce.core.RedisClient]] initialized with the given config and optionally [[io.lettuce.core.resource.ClientResources]]. */ + def makeClient[F[_]: Sync](config: LettuceConfig, clientResources: Option[ClientResources] = None): Resource[F, RedisClient] = { + val create = clientResources match { + case Some(resources) => RedisClient.create(resources) + case None => RedisClient.create() + } + val sync = Sync[F] + Resource.make { + sync.delay { + val client = create + client.setOptions(makeClientOptions(config)) + client + } + }(c => sync.delay(c.shutdown())) + } + + /** Makes [[io.lettuce.core.api.StatefulRedisConnection]] initialized with the given config and optionally [[io.lettuce.core.resource.ClientResources]]. */ + def makeConnection[F[_]: Async, K, V]( + config: LettuceConfig, + clientResources: Option[ClientResources] = None + )(implicit codec: RedisCodec[K, V]): Resource[F, StatefulRedisConnection[K, V]] = { + makeClient[F](config, clientResources).flatMap { client => + val async = Async[F] + Resource.make[F, StatefulRedisConnection[K, V]] { + async.asyncF[StatefulRedisConnection[K, V]] { cb => + async.delay { + client + .connectAsync(codec, RedisURI.create(config.uri)) + .handle[Unit] { (connection, ex) => + if (ex == null) { + cb(connection.asRight) + } else { + cb(ex.asLeft) + } + } + () + } + } + }(c => async.delay(c.close())) + } + } + + private def makeClientOptions(config: LettuceConfig): ClientOptions = + ClientOptions + .builder() + .pingBeforeActivateConnection(config.pingBeforeActivateConnection) + .autoReconnect(config.autoReconnect) + .cancelCommandsOnReconnectFailure(config.cancelCommandsOnReconnectFailure) + .suspendReconnectOnProtocolFailure(config.suspendReconnectOnProtocolFailure) + .requestQueueSize(config.requestQueueSize) + .disconnectedBehavior(config.disconnectedBehavior) + .protocolVersion(config.protocolVersion.orNull) + .scriptCharset(config.scriptCharset) + .publishOnScheduler(config.publishOnScheduler) + .socketOptions( + SocketOptions + .builder() + .connectTimeout(Duration.ofNanos(config.socketOptions.connectTimeout.toNanos)) + .keepAlive(config.socketOptions.keepAlive) + .tcpNoDelay(config.socketOptions.tcpNoDelay) + .build() + ) + .timeoutOptions(TimeoutOptions.builder().timeoutCommands(config.timeoutOptions.timeoutCommands).build()) + .sslOptions { + val opts = SslOptions + .builder() + .jdkSslProvider() + + config.sslOptions.keyStoreType.foreach(opts.keyStoreType) + config.sslOptions.keyStorePath.zip(config.sslOptions.keyStorePassword).foreach { case (path, pass) => + opts.keystore(new File(path), pass.toCharArray) + } + config.sslOptions.trustStorePath.zip(config.sslOptions.trustStorePassword).foreach { case (path, pass) => + opts.truststore(new File(path), pass) + } + + opts.build() + } + .build() + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c942e33dc..0d572897a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -18,6 +18,7 @@ object Dependencies { val http4sServer = "org.http4s" %% "http4s-server" % Versions.http4s val jsr305 = "com.google.code.findbugs" % "jsr305" % "3.0.2" val kindProjector = "org.typelevel" % "kind-projector" % "0.11.0" cross CrossVersion.full + val lettuce = "io.lettuce" % "lettuce-core" % "6.0.1.RELEASE" val logbackClassic = "ch.qos.logback" % "logback-classic" % "1.2.3" val micrometerCore = "io.micrometer" % "micrometer-core" % Versions.micrometerCore val micrometerJmx = "io.micrometer" % "micrometer-registry-jmx" % Versions.micrometerJmx diff --git a/site/docs/subprojects/lettuce.md b/site/docs/subprojects/lettuce.md new file mode 100644 index 000000000..f21813eaf --- /dev/null +++ b/site/docs/subprojects/lettuce.md @@ -0,0 +1,27 @@ +--- +layout: docs +title: "Lettuce (Redis)" +--- + +# FS2 Kafka + +`libraryDependencies += "com.avast" %% "sst-lettuce" % "@VERSION@"` + +This subproject initializes [Lettuce](https://lettuce.io) Redis driver: + +```scala mdoc:silent +import cats.effect.Resource +import com.avast.sst.lettuce.{LettuceConfig, LettuceModule} +import io.lettuce.core.codec.{RedisCodec, StringCodec} +import zio._ +import zio.interop.catz._ + +implicit val runtime = zio.Runtime.default // this is just needed in example + +implicit val lettuceCodec: RedisCodec[String, String] = StringCodec.UTF8 + +for { +connection <- LettuceModule.makeConnection[Task, String, String](LettuceConfig("redis://localhost")) +value <- Resource.liftF(Task.effect(connection.sync().get("key"))) +} yield value +```