Skip to content

Commit

Permalink
Merge pull request #143 from RedisLabs/structured-stream
Browse files Browse the repository at this point in the history
Structured streaming: fix #135 and #142
  • Loading branch information
fe2s authored Mar 24, 2019
2 parents a786380 + 1ed2ca9 commit 99462e6
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 44 deletions.
7 changes: 6 additions & 1 deletion doc/structured-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Each consumer will be mapped to a Spark partition. There is no ordering guarante

### Other configuration

The spark-redis automatically creates a consumer group with name `spark-source` if it doesn't exist. You can customize the consumer group name with
Spark-redis automatically creates a consumer group with name `spark-source` if it doesn't exist. You can customize the consumer group name with
`.option("stream.group.name", "my-group")`. Also you can customize the name of consumers in consumer group with `.option("stream.consumer.prefix", "my-consumer")`.


Expand All @@ -112,3 +112,8 @@ The default values are 100 items and 500 ms.
.option("stream.read.batch.size", 200)
.option("stream.read.block", 1000)
```

### Fault Tolerance Semantics

Spark-redis provides a replayable source, so while enabling [checkpointing](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing) and using
idempotent sinks, one can ensure end-to-end exactly-once semantics under any failure. If checkpointing is not enabled, it can lose messages.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ object RedisSource {
def getOffsetRanges(start: Option[Offset], end: Offset,
consumerConfigs: Seq[RedisConsumerConfig]): Seq[RedisSourceOffsetRange] = {

val offsetStarts = start.map(_.asInstanceOf[RedisSourceOffset]).map(_.offsets).getOrElse(Map())
val offsetEnds = end.asInstanceOf[RedisSourceOffset]
val offsetStarts = start.map(RedisSourceOffset.fromOffset).map(_.offsets).getOrElse(Map())
val offsetEnds = RedisSourceOffset.fromOffset(end)
val configsByStreamKey = consumerConfigs.groupBy(_.streamKey)

offsetEnds.offsets.flatMap { case (streamKey, offsetEnd) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.spark.sql.redis.stream

import com.redislabs.provider.redis.util.JsonUtils
import org.apache.spark.sql.execution.streaming.Offset
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.json4s.jackson.Serialization
import org.json4s.{Formats, NoTypeHints}

Expand All @@ -18,6 +18,18 @@ object RedisSourceOffset {

private implicit val formats: Formats = Serialization.formats(NoTypeHints)

def fromOffset(offset: Offset): RedisSourceOffset = {
offset match {
case o: RedisSourceOffset => o
case so: SerializedOffset => fromJson(so.json)
case _ =>
throw new IllegalArgumentException(
s"Invalid conversion from offset of ${offset.getClass} to RedisSourceOffset")
}

fromJson(offset.json())
}

def fromJson(json: String): RedisSourceOffset = {
try {
Serialization.read[RedisSourceOffset](json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@ class RedisSourceRdd(sc: SparkContext, redisConfig: RedisConfig,
val partition = split.asInstanceOf[RedisSourceRddPartition]
val offsetRange = partition.offsetRange
val streamReader = new RedisStreamReader(redisConfig)
if (offsetRange.start.isDefined) {
// offset is defined, read by offset
streamReader.streamEntriesByOffset(offsetRange)
} else {
// offset is not defined, happens for the first batch or after spark restart
// read starting from where the point the consumer group ended
streamReader.unreadStreamEntries(offsetRange)
}
streamReader.unreadStreamEntries(offsetRange)
}

override protected def getPartitions: Array[Partition] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,11 @@ import scala.math.Ordering.Implicits._
*/
class RedisStreamReader(redisConfig: RedisConfig) extends Logging with Serializable {

def streamEntriesByOffset(offsetRange: RedisSourceOffsetRange): Iterator[StreamEntry] = {
val config = offsetRange.config

logInfo(s"Reading stream entries with given offset " +
s"[${config.streamKey}, ${config.groupName}, ${config.consumerName} ${offsetRange.start}]...")

filterStreamEntries(offsetRange) {
val initialStart = offsetRange.start.map(id => new EntryID(id)).getOrElse(throw new RuntimeException("Offset start is not set"))
val initialEntry = new SimpleEntry(config.streamKey, initialStart)
Iterator.iterate(readStreamEntryBatches(offsetRange, initialEntry)) { response =>
val responseOption = for {
lastEntries <- response.asScala.lastOption
lastEntry <- lastEntries.getValue.asScala.lastOption
lastEntryId = lastEntry.getID
startEntryId = new EntryID(lastEntryId.getTime, lastEntryId.getSequence)
startEntryOffset = new SimpleEntry(config.streamKey, startEntryId)
} yield readStreamEntryBatches(offsetRange, startEntryOffset)
responseOption.getOrElse(new util.ArrayList)
}
}
}

def unreadStreamEntries(offsetRange: RedisSourceOffsetRange): Iterator[StreamEntry] = {
val config = offsetRange.config

logInfo(s"Reading unread stream entries " +
s"[${config.streamKey}, ${config.groupName}, ${config.consumerName}]... ")
logInfo(s"Reading entries " +
s"[${config.streamKey}, ${config.groupName}, ${config.consumerName}, start=${offsetRange.start} end=${offsetRange.end}]... ")

val res = filterStreamEntries(offsetRange) {
val startEntryOffset = new SimpleEntry(config.streamKey, EntryID.UNRECEIVED_ENTRY)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.apache.spark.sql.redis.stream

import java.io.File
import java.util.UUID

import com.redislabs.provider.redis.RedisConfig
import com.redislabs.provider.redis.env.Env
import com.redislabs.provider.redis.util.ConnectionUtils.{JedisExt, XINFO, withConnection}
Expand Down Expand Up @@ -96,7 +99,8 @@ trait RedisStreamSourceSuite extends FunSuite with Matchers with Env {
}

// re-read from the beginning
val offsetJson = s"""{"offsets":{"$streamKey":{"groupName":"redis-source","offset":"0-0"}}}"""
val offsetJson =
s"""{"offsets":{"$streamKey":{"groupName":"redis-source","offset":"0-0"}}}"""
val options = Map("stream.offsets" -> offsetJson)

readStream(streamKey, options) { spark =>
Expand Down Expand Up @@ -195,8 +199,34 @@ trait RedisStreamSourceSuite extends FunSuite with Matchers with Env {
}
}

def readStream(streamKey: String, extraOptions: Map[String, String] = Map())(body: SparkSession => Unit): Unit = {
val (spark, query) = readStream2(streamKey, extraOptions)
test("can start stream with checkpointing") {
val streamKey = Person.generatePersonStreamKey()
withConnection(streamKey) { conn =>
val checkPointLocation = s"${new File(".").getAbsolutePath}/checkpoint-test/${UUID.randomUUID()}"
val writeOptions = Map("checkpointLocation" -> checkPointLocation)
readStream(streamKey, extraWriteOptions = writeOptions) { spark =>
(1 to 5).foreach { i =>
conn.xadd(streamKey, new EntryID(0, i), Person.dataMaps.head.asJava)
}
}

// write 5 more items to stream
(6 to 10).foreach { i =>
conn.xadd(streamKey, new EntryID(0, i), Person.dataMaps.head.asJava)
}

// restart stream
readStream(streamKey, extraWriteOptions = writeOptions, writeFormat = "console") { spark =>
}
}
}

def readStream(streamKey: String,
extraReadOptions: Map[String, String] = Map(),
extraWriteOptions: Map[String, String] = Map(),
writeFormat: String = "memory")(body: SparkSession => Unit): Unit = {

val (spark, query) = readStream2(streamKey, extraReadOptions, extraWriteOptions, writeFormat)
// give some time for spark query to start
Thread.sleep(50)
try {
Expand All @@ -207,7 +237,10 @@ trait RedisStreamSourceSuite extends FunSuite with Matchers with Env {
}
}

def readStream2(streamKey: String, extraOptions: Map[String, String] = Map()): (SparkSession, StreamingQuery) = {
def readStream2(streamKey: String,
extraReadOptions: Map[String, String],
extraWriteOptions: Map[String, String],
writeFormat: String): (SparkSession, StreamingQuery) = {
val spark = SparkSession
.builder
.config(conf)
Expand All @@ -219,13 +252,17 @@ trait RedisStreamSourceSuite extends FunSuite with Matchers with Env {
.option(StreamOptionStreamKeys, streamKey)

// apply extra reader options
val reader = extraOptions.foldLeft(readerBase) { case (r, (k, v)) => r.option(k, v) }
val reader = extraReadOptions.foldLeft(readerBase) { case (r, (k, v)) => r.option(k, v) }

val persons = reader.load()
val query = persons.writeStream
.format("memory")
val queryBase = persons.writeStream
.format(writeFormat)
.queryName("persons")
.start()

// apply extra writer options
val queryWithOptions = extraWriteOptions.foldLeft(queryBase) { case (r, (k, v)) => r.option(k, v) }

val query = queryWithOptions.start()

println(s"query id ${query.id}")
(spark, query)
Expand Down

0 comments on commit 99462e6

Please sign in to comment.