Skip to content

Commit

Permalink
[SPARK-1678][SPARK-1679] In-memory compression bug fix and made compr…
Browse files Browse the repository at this point in the history
…ession configurable, disabled by default

In-memory compression is now configurable in `SparkConf` by the `spark.sql.inMemoryCompression.enabled` property, and is disabled by default.

To help code review, the bug fix is in [the first commit](liancheng/spark@d537a36), compression configuration is in [the second one](liancheng/spark@4ce09aa).

Author: Cheng Lian <[email protected]>

Closes mesos#608 from liancheng/spark-1678 and squashes the following commits:

66c3a8d [Cheng Lian] Renamed in-memory compression configuration key
f8fb3a0 [Cheng Lian] Added assertion for testing .hasNext of various decoder
4ce09aa [Cheng Lian] Made in-memory compression configurable via SparkConf
d537a36 [Cheng Lian] Fixed SPARK-1678
  • Loading branch information
liancheng authored and pwendell committed May 6, 2014
1 parent 98750a7 commit 6d721c5
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 26 deletions.
7 changes: 5 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
val useCompression =
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
val asInMemoryRelation =
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
InMemoryColumnarTableScan(
currentTable.output, executePlan(currentTable).executedPlan, useCompression)

catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}
Expand All @@ -173,7 +176,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "")
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)

/**
* Appends `row(ordinal)` to the column builder.
Expand All @@ -55,7 +55,11 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](

protected var buffer: ByteBuffer = _

override def initialize(initialSize: Int, columnName: String = "") = {
override def initialize(
initialSize: Int,
columnName: String = "",
useCompression: Boolean = false) = {

val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName

Expand Down Expand Up @@ -130,7 +134,12 @@ private[sql] object ColumnBuilder {
}
}

def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
def apply(
typeId: Int,
initialSize: Int = 0,
columnName: String = "",
useCompression: Boolean = false): ColumnBuilder = {

val builder = (typeId match {
case INT.typeId => new IntColumnBuilder
case LONG.typeId => new LongColumnBuilder
Expand All @@ -144,7 +153,7 @@ private[sql] object ColumnBuilder {
case GENERIC.typeId => new GenericColumnBuilder
}).asInstanceOf[ColumnBuilder]

builder.initialize(initialSize, columnName)
builder.initialize(initialSize, columnName, useCompression)
builder
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
import org.apache.spark.sql.Row
import org.apache.spark.SparkConf

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
private[sql] case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
child: SparkPlan,
useCompression: Boolean)
extends LeafNode {

override def output: Seq[Attribute] = attributes
Expand All @@ -30,7 +34,7 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
}.toArray

var row: Row = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
private var pos: Int = _
private var nullCount: Int = _

abstract override def initialize(initialSize: Int, columnName: String) {
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
nulls = ByteBuffer.allocate(1024)
nulls.order(ByteOrder.nativeOrder())
pos = 0
nullCount = 0
super.initialize(initialSize, columnName)
super.initialize(initialSize, columnName, useCompression)
}

abstract override def appendFrom(row: Row, ordinal: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ private[sql] trait CompressibleColumnAccessor[T <: NativeType] extends ColumnAcc
decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType)
}

abstract override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
abstract override def hasNext = super.hasNext || decoder.hasNext

override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

import CompressionScheme._

val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])
var compressionEncoders: Seq[Encoder[T]] = _

abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
compressionEncoders =
if (useCompression) {
schemes.filter(_.supports(columnType)).map(_.encoder[T])
} else {
Seq(PassThrough.encoder)
}
super.initialize(initialSize, columnName, useCompression)
}

protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
currentValue
}

override def hasNext = buffer.hasRemaining
override def hasNext = valueCount < run || buffer.hasRemaining
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case scan @ InMemoryColumnarTableScan(output, child) =>
case scan @ InMemoryColumnarTableScan(output, _, _) =>
scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
Expand Down
11 changes: 11 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,15 @@ object TestData {
ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) ::
ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil)
arrayData.registerAsTable("arrayData")

case class StringData(s: String)
val repeatedData =
TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test")))
repeatedData.registerAsTable("repeatedData")

val nullableRepeatedData =
TestSQLContext.sparkContext.parallelize(
List.fill(2)(StringData(null)) ++
List.fill(2)(StringData("test")))
nullableRepeatedData.registerAsTable("nullableRepeatedData")
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))

checkAnswer(scan, testData.collect().toSeq)
}

test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
Expand All @@ -44,9 +44,33 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
}

test("SPARK-1678 regression: compression must not lose repeated values") {
checkAnswer(
sql("SELECT * FROM repeatedData"),
repeatedData.collect().toSeq)

TestSQLContext.cacheTable("repeatedData")

checkAnswer(
sql("SELECT * FROM repeatedData"),
repeatedData.collect().toSeq)
}

test("with null values") {
checkAnswer(
sql("SELECT * FROM nullableRepeatedData"),
nullableRepeatedData.collect().toSeq)

TestSQLContext.cacheTable("nullableRepeatedData")

checkAnswer(
sql("SELECT * FROM nullableRepeatedData"),
nullableRepeatedData.collect().toSeq)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ class BooleanBitSetSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)

val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
if (values.nonEmpty) {
values.foreach {
assert(decoder.hasNext)
expectResult(_, "Wrong decoded value")(decoder.next())
}
}
assert(!decoder.hasNext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ class DictionaryEncodingSuite extends FunSuite {

val decoder = DictionaryEncoding.decoder(buffer, columnType)

inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
assert(decoder.hasNext)
expectResult(values(i), "Wrong decoded value")(decoder.next())
}
}

assert(!decoder.hasNext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ class IntegralDeltaSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)

val decoder = scheme.decoder(buffer, columnType)
input.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
if (input.nonEmpty) {
input.foreach{
assert(decoder.hasNext)
expectResult(_, "Wrong decoded value")(decoder.next())
}
}
assert(!decoder.hasNext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ class RunLengthEncodingSuite extends FunSuite {

val decoder = RunLengthEncoding.decoder(buffer, columnType)

inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
if (inputSeq.nonEmpty) {
inputSeq.foreach { i =>
assert(decoder.hasNext)
expectResult(values(i), "Wrong decoded value")(decoder.next())
}
}

assert(!decoder.hasNext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TestCompressibleColumnBuilder {
scheme: CompressionScheme) = {

val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
builder.initialize(0)
builder.initialize(0, "", useCompression = true)
builder
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child)

case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), _, child, _) =>
_, HiveTableScan(_, table, _), _)), _, child, _) =>
castChildOutput(p, table, child)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), partition, child, overwrite) =>
_, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}
Expand Down

0 comments on commit 6d721c5

Please sign in to comment.