-
Notifications
You must be signed in to change notification settings - Fork 20
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Implement Redis support via Lettuce (#382)
* feat: Implement Redis support via Lettuce Closes #51 * fix: Compilation error * docs: Add simple Lettuce documentation
- Loading branch information
1 parent
91d2cff
commit 3b4232f
Showing
7 changed files
with
262 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
54 changes: 54 additions & 0 deletions
54
lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/ConfigReaders.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package com.avast.sst.lettuce.pureconfig | ||
|
||
import java.nio.charset.Charset | ||
|
||
import cats.syntax.either._ | ||
import com.avast.sst.lettuce.LettuceConfig | ||
import com.avast.sst.lettuce.LettuceConfig.{SocketOptions, SslOptions, TimeoutOptions} | ||
import io.lettuce.core.ClientOptions.DisconnectedBehavior | ||
import io.lettuce.core.protocol.ProtocolVersion | ||
import pureconfig.ConfigReader | ||
import pureconfig.error.CannotConvert | ||
import pureconfig.generic.ProductHint | ||
import pureconfig.generic.semiauto.deriveReader | ||
|
||
trait ConfigReaders { | ||
|
||
implicit protected def hint[T]: ProductHint[T] = ProductHint.default | ||
|
||
implicit val lettuceDisconnectedBehaviorConfigReader: ConfigReader[DisconnectedBehavior] = ConfigReader.stringConfigReader.emap { | ||
case "DEFAULT" => DisconnectedBehavior.DEFAULT.asRight | ||
case "ACCEPT_COMMANDS" => DisconnectedBehavior.ACCEPT_COMMANDS.asRight | ||
case "REJECT_COMMANDS" => DisconnectedBehavior.REJECT_COMMANDS.asRight | ||
case unknown => | ||
CannotConvert( | ||
unknown, | ||
"DisconnectedBehavior", | ||
s"Unknown enum value: ${DisconnectedBehavior.values().map(_.name()).mkString("|")}" | ||
).asLeft | ||
} | ||
|
||
implicit val lettuceProtocolVersionConfigReader: ConfigReader[ProtocolVersion] = ConfigReader.stringConfigReader.emap { | ||
case "RESP2" => ProtocolVersion.RESP2.asRight | ||
case "RESP3" => ProtocolVersion.RESP3.asRight | ||
case unknown => | ||
CannotConvert( | ||
unknown, | ||
"ProtocolVersion", | ||
s"Unknown enum value: ${ProtocolVersion.values().map(_.name()).mkString("|")}" | ||
).asLeft | ||
} | ||
|
||
implicit val lettuceCharsetConfigReader: ConfigReader[Charset] = ConfigReader.stringConfigReader.emap { charset => | ||
Either.catchNonFatal(Charset.forName(charset)).leftMap(ex => CannotConvert(charset, "java.nio.Charset", ex.getMessage)) | ||
} | ||
|
||
implicit val lettuceSocketOptionsReader: ConfigReader[SocketOptions] = deriveReader | ||
|
||
implicit val lettuceSslOptionsReader: ConfigReader[SslOptions] = deriveReader | ||
|
||
implicit val lettuceTimeoutOptionsReader: ConfigReader[TimeoutOptions] = deriveReader | ||
|
||
implicit val lettuceConfigReader: ConfigReader[LettuceConfig] = deriveReader | ||
|
||
} |
20 changes: 20 additions & 0 deletions
20
lettuce-pureconfig/src/main/scala/com/avast/sst/lettuce/pureconfig/implicits.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package com.avast.sst.lettuce.pureconfig | ||
|
||
import pureconfig.ConfigFieldMapping | ||
import pureconfig.generic.ProductHint | ||
|
||
/** Contains [[pureconfig.ConfigReader]] instances with default "kebab-case" naming convention. */ | ||
object implicits extends ConfigReaders { | ||
|
||
/** Contains [[pureconfig.ConfigReader]] instances with "kebab-case" naming convention. | ||
* | ||
* This is alias for the default `implicits._` import. | ||
*/ | ||
object KebabCase extends ConfigReaders | ||
|
||
/** Contains [[pureconfig.ConfigReader]] instances with "camelCase" naming convention. */ | ||
object CamelCase extends ConfigReaders { | ||
implicit override protected def hint[T]: ProductHint[T] = ProductHint(ConfigFieldMapping(pureconfig.CamelCase, pureconfig.CamelCase)) | ||
} | ||
|
||
} |
46 changes: 46 additions & 0 deletions
46
lettuce/src/main/scala/com/avast/sst/lettuce/LettuceConfig.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package com.avast.sst.lettuce | ||
|
||
import java.nio.charset.Charset | ||
|
||
import com.avast.sst.lettuce.LettuceConfig.{SocketOptions, SslOptions, TimeoutOptions} | ||
import io.lettuce.core.ClientOptions.DisconnectedBehavior | ||
import io.lettuce.core.protocol.ProtocolVersion | ||
import io.lettuce.core.{ClientOptions, SocketOptions => LettuceSocketOptions, TimeoutOptions => LettuceTimeoutOptions} | ||
|
||
import scala.concurrent.duration.Duration | ||
|
||
final case class LettuceConfig( | ||
uri: String, | ||
pingBeforeActivateConnection: Boolean = ClientOptions.DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION, | ||
autoReconnect: Boolean = ClientOptions.DEFAULT_AUTO_RECONNECT, | ||
cancelCommandsOnReconnectFailure: Boolean = ClientOptions.DEFAULT_CANCEL_CMD_RECONNECT_FAIL, | ||
suspendReconnectOnProtocolFailure: Boolean = ClientOptions.DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL, | ||
requestQueueSize: Int = ClientOptions.DEFAULT_REQUEST_QUEUE_SIZE, | ||
disconnectedBehavior: DisconnectedBehavior = DisconnectedBehavior.DEFAULT, | ||
protocolVersion: Option[ProtocolVersion] = None, | ||
scriptCharset: Charset = ClientOptions.DEFAULT_SCRIPT_CHARSET, | ||
publishOnScheduler: Boolean = ClientOptions.DEFAULT_SUSPEND_RECONNECT_PROTO_FAIL, | ||
socketOptions: SocketOptions = SocketOptions(), | ||
sslOptions: SslOptions = SslOptions(), | ||
timeoutOptions: TimeoutOptions = TimeoutOptions() | ||
) | ||
|
||
object LettuceConfig { | ||
|
||
final case class SocketOptions( | ||
connectTimeout: Duration = Duration.fromNanos(LettuceSocketOptions.DEFAULT_CONNECT_TIMEOUT_DURATION.toNanos), | ||
keepAlive: Boolean = LettuceSocketOptions.DEFAULT_SO_KEEPALIVE, | ||
tcpNoDelay: Boolean = LettuceSocketOptions.DEFAULT_SO_NO_DELAY | ||
) | ||
|
||
final case class SslOptions( | ||
keyStoreType: Option[String] = None, | ||
keyStorePath: Option[String] = None, | ||
keyStorePassword: Option[String] = None, | ||
trustStorePath: Option[String] = None, | ||
trustStorePassword: Option[String] = None | ||
) | ||
|
||
final case class TimeoutOptions(timeoutCommands: Boolean = LettuceTimeoutOptions.DEFAULT_TIMEOUT_COMMANDS) | ||
|
||
} |
95 changes: 95 additions & 0 deletions
95
lettuce/src/main/scala/com/avast/sst/lettuce/LettuceModule.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package com.avast.sst.lettuce | ||
|
||
import java.io.File | ||
import java.time.Duration | ||
|
||
import cats.effect.{Async, Resource, Sync} | ||
import cats.syntax.either._ | ||
import io.lettuce.core.api.StatefulRedisConnection | ||
import io.lettuce.core.codec.RedisCodec | ||
import io.lettuce.core.resource.ClientResources | ||
import io.lettuce.core.{ClientOptions, RedisClient, RedisURI, SocketOptions, SslOptions, TimeoutOptions} | ||
|
||
object LettuceModule { | ||
|
||
/** Makes [[io.lettuce.core.RedisClient]] initialized with the given config and optionally [[io.lettuce.core.resource.ClientResources]]. */ | ||
def makeClient[F[_]: Sync](config: LettuceConfig, clientResources: Option[ClientResources] = None): Resource[F, RedisClient] = { | ||
val create = clientResources match { | ||
case Some(resources) => RedisClient.create(resources) | ||
case None => RedisClient.create() | ||
} | ||
val sync = Sync[F] | ||
Resource.make { | ||
sync.delay { | ||
val client = create | ||
client.setOptions(makeClientOptions(config)) | ||
client | ||
} | ||
}(c => sync.delay(c.shutdown())) | ||
} | ||
|
||
/** Makes [[io.lettuce.core.api.StatefulRedisConnection]] initialized with the given config and optionally [[io.lettuce.core.resource.ClientResources]]. */ | ||
def makeConnection[F[_]: Async, K, V]( | ||
config: LettuceConfig, | ||
clientResources: Option[ClientResources] = None | ||
)(implicit codec: RedisCodec[K, V]): Resource[F, StatefulRedisConnection[K, V]] = { | ||
makeClient[F](config, clientResources).flatMap { client => | ||
val async = Async[F] | ||
Resource.make[F, StatefulRedisConnection[K, V]] { | ||
async.asyncF[StatefulRedisConnection[K, V]] { cb => | ||
async.delay { | ||
client | ||
.connectAsync(codec, RedisURI.create(config.uri)) | ||
.handle[Unit] { (connection, ex) => | ||
if (ex == null) { | ||
cb(connection.asRight) | ||
} else { | ||
cb(ex.asLeft) | ||
} | ||
} | ||
() | ||
} | ||
} | ||
}(c => async.delay(c.close())) | ||
} | ||
} | ||
|
||
private def makeClientOptions(config: LettuceConfig): ClientOptions = | ||
ClientOptions | ||
.builder() | ||
.pingBeforeActivateConnection(config.pingBeforeActivateConnection) | ||
.autoReconnect(config.autoReconnect) | ||
.cancelCommandsOnReconnectFailure(config.cancelCommandsOnReconnectFailure) | ||
.suspendReconnectOnProtocolFailure(config.suspendReconnectOnProtocolFailure) | ||
.requestQueueSize(config.requestQueueSize) | ||
.disconnectedBehavior(config.disconnectedBehavior) | ||
.protocolVersion(config.protocolVersion.orNull) | ||
.scriptCharset(config.scriptCharset) | ||
.publishOnScheduler(config.publishOnScheduler) | ||
.socketOptions( | ||
SocketOptions | ||
.builder() | ||
.connectTimeout(Duration.ofNanos(config.socketOptions.connectTimeout.toNanos)) | ||
.keepAlive(config.socketOptions.keepAlive) | ||
.tcpNoDelay(config.socketOptions.tcpNoDelay) | ||
.build() | ||
) | ||
.timeoutOptions(TimeoutOptions.builder().timeoutCommands(config.timeoutOptions.timeoutCommands).build()) | ||
.sslOptions { | ||
val opts = SslOptions | ||
.builder() | ||
.jdkSslProvider() | ||
|
||
config.sslOptions.keyStoreType.foreach(opts.keyStoreType) | ||
config.sslOptions.keyStorePath.zip(config.sslOptions.keyStorePassword).foreach { case (path, pass) => | ||
opts.keystore(new File(path), pass.toCharArray) | ||
} | ||
config.sslOptions.trustStorePath.zip(config.sslOptions.trustStorePassword).foreach { case (path, pass) => | ||
opts.truststore(new File(path), pass) | ||
} | ||
|
||
opts.build() | ||
} | ||
.build() | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
--- | ||
layout: docs | ||
title: "Lettuce (Redis)" | ||
--- | ||
|
||
# FS2 Kafka | ||
|
||
`libraryDependencies += "com.avast" %% "sst-lettuce" % "@VERSION@"` | ||
|
||
This subproject initializes [Lettuce](https://lettuce.io) Redis driver: | ||
|
||
```scala mdoc:silent | ||
import cats.effect.Resource | ||
import com.avast.sst.lettuce.{LettuceConfig, LettuceModule} | ||
import io.lettuce.core.codec.{RedisCodec, StringCodec} | ||
import zio._ | ||
import zio.interop.catz._ | ||
|
||
implicit val runtime = zio.Runtime.default // this is just needed in example | ||
|
||
implicit val lettuceCodec: RedisCodec[String, String] = StringCodec.UTF8 | ||
|
||
for { | ||
connection <- LettuceModule.makeConnection[Task, String, String](LettuceConfig("redis://localhost")) | ||
value <- Resource.liftF(Task.effect(connection.sync().get("key"))) | ||
} yield value | ||
``` |