diff --git a/README.md b/README.md index d463135..58b2164 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,143 @@ # isarn-sketches-spark Routines and data structures for using isarn-sketches idiomatically in Apache Spark + +## API documentation +https://isarn.github.io/isarn-sketches-spark/latest/api/#org.isarnproject.sketches.udaf.package + +## How to use in your project + +#### sbt +``` scala +resolvers += "isarn project" at "https://dl.bintray.com/isarn/maven/" + +libraryDependencies += "org.isarnproject" %% "isarn-sketches-spark" % "0.1.0" +``` + +#### maven +``` xml + + org.isarnproject + isarn-sketches-spark_2.10 + 0.1.0 + pom + +``` + +## Examples + +### Sketch a numeric column +```scala +scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches._ +import org.isarnproject.sketches.udaf._ +import org.apache.spark.isarnproject.sketches.udt._ + +scala> import scala.util.Random.nextGaussian +import scala.util.Random.nextGaussian + +scala> val data = sc.parallelize(Vector.fill(1000){(nextGaussian, nextGaussian)}).toDF.as[(Double, Double)] +data: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: double] + +scala> val udaf = tdigestUDAF[Double].delta(0.2).maxDiscrete(25) +udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.2,25) + +scala> val agg = data.agg(udaf($"_1"), udaf($"_2")) +agg: org.apache.spark.sql.DataFrame = [tdigestudaf(_1): tdigest, tdigestudaf(_2): tdigest] + +scala> val (td1, td2) = (agg.first.getAs[TDigestSQL](0).tdigest, agg.first.getAs[TDigestSQL](1).tdigest) +td1: org.isarnproject.sketches.TDigest = TDigest(0.2,25,151,TDigestMap(-3.1241237514093707 -> (1.0, 1.0), ... + +scala> td1.cdf(0) +res1: Double = 0.5159531867457404 + +scala> td2.cdf(0) +res2: Double = 0.504233763693618 +``` + +### Sketch a numeric array column +```scala +scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches._ +import org.isarnproject.sketches.udaf._ +import org.apache.spark.isarnproject.sketches.udt._ + +scala> import scala.util.Random._ +import scala.util.Random._ + +scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vector.fill(5){nextGaussian})}) +data: org.apache.spark.sql.DataFrame = [_1: int, _2: array] + +scala> val udaf1 = tdigestUDAF[Int].maxDiscrete(20) +udaf1: org.isarnproject.sketches.udaf.TDigestUDAF[Int] = TDigestUDAF(0.5,20) + +scala> val udafA = tdigestArrayUDAF[Double] +udafA: org.isarnproject.sketches.udaf.TDigestArrayUDAF[Double] = TDigestArrayUDAF(0.5,0) + +scala> val (first1, firstA) = (data.agg(udaf1($"_1")).first, data.agg(udafA($"_2")).first) +first1: org.apache.spark.sql.Row = [TDigestSQL(TDigest(0.5,20,19,TDigestMap(-9.0 -> (51.0, 51.0),... +firstA: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@782b0d37)] + +scala> val sample1 = Vector.fill(10) { first1.getAs[TDigestSQL](0).tdigest.sample } +sample1: scala.collection.immutable.Vector[Double] = Vector(0.0, 7.0, 9.0, 6.0, 1.0, 3.0, 4.0, 0.0, 9.0, 0.0) + +scala> val sampleA = firstA.getAs[TDigestArraySQL](0).tdigests.map(_.sample) +sampleA: Array[Double] = Array(0.5079398036724695, 0.7518583956493221, -0.054376728126603546, 0.7141623682043323, 0.4788564991204228) +``` + +### Sketch a column of ML Vector +```scala +scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches._ +import org.isarnproject.sketches.udaf._ +import org.apache.spark.isarnproject.sketches.udt._ + +scala> import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.Vectors + +scala> import scala.util.Random._ +import scala.util.Random._ + +scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))}) +data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector] + +scala> val udafV = tdigestMLVecUDAF +udafV: org.isarnproject.sketches.udaf.TDigestMLVecUDAF = TDigestMLVecUDAF(0.5,0) + +scala> val firstV = data.agg(udafV($"_2")).first +firstV: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@42b579cd)] + +scala> val sampleV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.sample) +sampleV: Array[Double] = Array(1.815862652134914, 0.24668895676164276, 0.09236479932949887) + +scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5)) +medianV: Array[Double] = Array(-0.049806905959001196, -0.08528817932077674, -0.05291800642695017) +``` + +### Sketch a column of MLLib Vector +```scala +scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches._ +import org.isarnproject.sketches.udaf._ +import org.apache.spark.isarnproject.sketches.udt._ + +scala> import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.Vectors + +scala> import scala.util.Random._ +import scala.util.Random._ + +scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))}) +data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector] + +scala> val udafV = tdigestMLLibVecUDAF +udafV: org.isarnproject.sketches.udaf.TDigestMLLibVecUDAF = TDigestMLLibVecUDAF(0.5,0) + +scala> val firstV = data.agg(udafV($"_2")).first +firstV: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@6bffea90)] + +scala> val sampleV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.sample) +sampleV: Array[Double] = Array(0.10298190759496548, -0.1968752746464183, -1.0139250851274562) + +scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5)) +medianV: Array[Double] = Array(0.025820266848484798, 0.01951778217339037, 0.09701138847692858) +``` diff --git a/build.sbt b/build.sbt index 8c8ca30..e64dfcf 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ organization := "org.isarnproject" bintrayOrganization := Some("isarn") -version := "0.1.0.rc1" +version := "0.1.0" scalaVersion := "2.11.8" @@ -31,7 +31,7 @@ def commonSettings = Seq( |import org.apache.spark.isarnproject.sketches.udt._ |val initialConf = new SparkConf().setAppName("repl").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "16mb") |val spark = SparkSession.builder.config(initialConf).master("local[2]").getOrCreate() - |import spark._ + |import spark._, spark.implicits._ |val sc = spark.sparkContext |import org.apache.log4j.{Logger, ConsoleAppender, Level} |Logger.getRootLogger().getAppender("console").asInstanceOf[ConsoleAppender].setThreshold(Level.WARN) diff --git a/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala b/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala index 3ec78a4..267bd24 100644 --- a/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala +++ b/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala @@ -20,9 +20,20 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArra import org.isarnproject.sketches.TDigest import org.isarnproject.sketches.tdmap.TDigestMap +/** A type for receiving the results of deserializing [[TDigestUDT]]. + * The payload is the tdigest member field, holding a TDigest object. + * This is necessary because (a) I define the TDigest type is defined in the isarn-sketches + * package and I do not + * want any Spark dependencies on that package, and (b) Spark has privatized UserDefinedType under + * org.apache.spark scope, and so I have to have a paired result type in the same scope. + * @param tdigest The TDigest payload, which does the actual sketching. + */ @SQLUserDefinedType(udt = classOf[TDigestUDT]) case class TDigestSQL(tdigest: TDigest) +/** A UserDefinedType for serializing and deserializing [[TDigestSQL]] structures during UDAF + * aggregations. + */ class TDigestUDT extends UserDefinedType[TDigestSQL] { def userClass: Class[TDigestSQL] = classOf[TDigestSQL] @@ -66,11 +77,21 @@ class TDigestUDT extends UserDefinedType[TDigestSQL] { } } +/** Instantiated instance of [[TDigestUDT]] for use by UDAF objects */ case object TDigestUDT extends TDigestUDT +/** A type for receiving the results of deserializing [[TDigestArrayUDT]]. + * The payload is the tdigests member field, holding an Array of TDigest objects. + * @param tdigests An array of TDigest objects, which do the actual sketching. + * @see [[TDigestSQL]] for additional context + */ @SQLUserDefinedType(udt = classOf[TDigestArrayUDT]) case class TDigestArraySQL(tdigests: Array[TDigest]) +/** + * A UserDefinedType for serializing and deserializing [[TDigestArraySQL]] objects + * during UDAF aggregations + */ class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] { def userClass: Class[TDigestArraySQL] = classOf[TDigestArraySQL] @@ -124,9 +145,10 @@ class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] { } } +/** A [[TDigestArrayUDT]] instance for use in declaring UDAF objects */ case object TDigestArrayUDT extends TDigestArrayUDT -// VectorUDT is private[spark], but I can expose what I need this way: +/** Shims for exposing Spark's VectorUDT objects outside of org.apache.spark scope */ object TDigestUDTInfra { private val udtML = new org.apache.spark.ml.linalg.VectorUDT def udtVectorML: DataType = udtML diff --git a/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala b/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala index 62ab137..3bb4bd7 100644 --- a/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala +++ b/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala @@ -23,12 +23,21 @@ import org.isarnproject.sketches.TDigest import org.apache.spark.isarnproject.sketches.udt._ +/** + * A UDAF for sketching numeric data with a TDigest. + * Expected to be created using [[tdigestUDAF]]. + * @tparam N the expected numeric type of the data; Double, Int, etc + * @param deltaV The delta value to be used by the TDigest object + * @param maxDiscreteV The maxDiscrete value to be used by the TDigest object + */ case class TDigestUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit num: Numeric[N], dataTpe: TDigestUDAFDataType[N]) extends UserDefinedAggregateFunction { + /** customize the delta value to be used by the TDigest object */ def delta(deltaP: Double) = this.copy(deltaV = deltaP) + /** customize the maxDiscrete value to be used by the TDigest object */ def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP) // A t-digest is deterministic, but it is only statistically associative or commutative @@ -59,6 +68,7 @@ case class TDigestUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit def evaluate(buf: Row): Any = buf.getAs[TDigestSQL](0) } +/** A base class that defines the common functionality for array sketching UDAFs */ abstract class TDigestMultiUDAF extends UserDefinedAggregateFunction { def deltaV: Double def maxDiscreteV: Int @@ -90,11 +100,19 @@ abstract class TDigestMultiUDAF extends UserDefinedAggregateFunction { def evaluate(buf: Row): Any = buf.getAs[TDigestArraySQL](0) } +/** + * A UDAF for sketching a column of ML Vectors with an array of TDigest objects. + * Expected to be created using [[tdigestMLVecUDAF]]. + * @param deltaV The delta value to be used by the TDigest object + * @param maxDiscreteV The maxDiscrete value to be used by the TDigest object + */ case class TDigestMLVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMultiUDAF { import org.apache.spark.ml.linalg.{ Vector => Vec } + /** customize the delta value to be used by the TDigest object */ def delta(deltaP: Double) = this.copy(deltaV = deltaP) + /** customize the maxDiscrete value to be used by the TDigest object */ def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP) def inputSchema: StructType = StructType(StructField("vector", TDigestUDTInfra.udtVectorML) :: Nil) @@ -124,11 +142,19 @@ case class TDigestMLVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMu } } +/** + * A UDAF for sketching a column of MLLib Vectors with an array of TDigest objects. + * Expected to be created using [[tdigestMLLibVecUDAF]]. + * @param deltaV The delta value to be used by the TDigest object + * @param maxDiscreteV The maxDiscrete value to be used by the TDigest object + */ case class TDigestMLLibVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMultiUDAF { import org.apache.spark.mllib.linalg.{ Vector => Vec } + /** customize the delta value to be used by the TDigest object */ def delta(deltaP: Double) = this.copy(deltaV = deltaP) + /** customize the maxDiscrete value to be used by the TDigest object */ def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP) def inputSchema: StructType = @@ -159,12 +185,21 @@ case class TDigestMLLibVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDiges } } +/** + * A UDAF for sketching a column of numeric ArrayData with an array of TDigest objects. + * Expected to be created using [[tdigestArrayUDAF]]. + * @tparam N the expected numeric type of the data; Double, Int, etc + * @param deltaV The delta value to be used by the TDigest objects + * @param maxDiscreteV The maxDiscrete value to be used by the TDigest objects + */ case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit num: Numeric[N], dataTpe: TDigestUDAFDataType[N]) extends TDigestMultiUDAF { + /** customize the delta value to be used by the TDigest object */ def delta(deltaP: Double) = this.copy(deltaV = deltaP) + /** customize the maxDiscrete value to be used by the TDigest object */ def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP) def inputSchema: StructType = @@ -183,6 +218,7 @@ case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit if (x != null) tdigests(j) += num.toDouble(x) j += 1 } + buf(0) = TDigestArraySQL(tdigests) } } } diff --git a/src/main/scala/org/isarnproject/sketches/udaf/package.scala b/src/main/scala/org/isarnproject/sketches/udaf/package.scala index 0003835..9f2f536 100644 --- a/src/main/scala/org/isarnproject/sketches/udaf/package.scala +++ b/src/main/scala/org/isarnproject/sketches/udaf/package.scala @@ -20,25 +20,91 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.isarnproject.sketches.udt._ +/** package-wide methods, implicits and definitions for sketching UDAFs */ package object udaf { + /** + * Obtain a UDAF for sketching a single numeric Dataset column using a t-digest + * @tparam N The numeric type of the column; Double, Int, etc + * @return A UDAF that can be applied to a Dataset column + * @example + * {{{ + * import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ + * // create a UDAF for a t-digest, adding custom settings for delta and maxDiscrete + * val udafTD = tdigestUDAF[Double].delta(0.1).maxDiscrete(25) + * // apply the UDAF to get a t-digest for a data column + * val agg = data.agg(udafTD($"NumericColumn")) + * // extract the t-digest + * val td = agg.getAs[TDigestSQL](0).tdigest + * }}} + */ def tdigestUDAF[N](implicit num: Numeric[N], dataType: TDigestUDAFDataType[N]) = TDigestUDAF(TDigest.deltaDefault, 0) + /** + * Obtain a UDAF for sketching a numeric array-data Dataset column, using a t-digest for + * each element. + * @tparam N The numeric type of the array-data column; Double, Int, etc + * @return A UDAF that can be applied to a Dataset array-data column + * @example + * {{{ + * import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ + * // create a UDAF for t-digest array, adding custom settings for delta and maxDiscrete + * val udafTD = tdigestArrayUDAF[Double].delta(0.1).maxDiscrete(25) + * // apply the UDAF to get an array of t-digests for each element in the array-data + * val agg = data.agg(udafTD($"NumericArrayColumn")) + * // extract the t-digest array + * val tdArray = agg.getAs[TDigestArraySQL](0).tdigests + * }}} + */ def tdigestArrayUDAF[N](implicit num: Numeric[N], dataType: TDigestUDAFDataType[N]) = TDigestArrayUDAF(TDigest.deltaDefault, 0) + /** + * Obtain a UDAF for sketching an ML Vector Dataset column, using a t-digest for + * each element in the vector + * @return A UDAF that can be applied to a ML Vector column + * @example + * {{{ + * import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ + * // create a UDAF for t-digest array, adding custom settings for delta and maxDiscrete + * val udafTD = tdigestMLVecUDAF[Double].delta(0.1).maxDiscrete(25) + * // apply the UDAF to get an array of t-digests for each element in the array-data + * val agg = data.agg(udafTD($"MLVecColumn")) + * // extract the t-digest array + * val tdArray = agg.getAs[TDigestArraySQL](0).tdigests + * }}} + */ def tdigestMLVecUDAF = TDigestMLVecUDAF(TDigest.deltaDefault, 0) + /** + * Obtain a UDAF for sketching an MLLib Vector Dataset column, using a t-digest for + * each element in the vector + * @return A UDAF that can be applied to a MLLib Vector column + * @example + * {{{ + * import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ + * // create a UDAF for t-digest array, adding custom settings for delta and maxDiscrete + * val udafTD = tdigestMLLibVecUDAF[Double].delta(0.1).maxDiscrete(25) + * // apply the UDAF to get an array of t-digests for each element in the array-data + * val agg = data.agg(udafTD($"MLLibVecColumn")) + * // extract the t-digest array + * val tdArray = agg.getAs[TDigestArraySQL](0).tdigests + * }}} + */ def tdigestMLLibVecUDAF = TDigestMLLibVecUDAF(TDigest.deltaDefault, 0) + /** implicitly unpack a TDigestSQL to extract its TDigest payload */ implicit def implicitTDigestSQLToTDigest(tdsql: TDigestSQL): TDigest = tdsql.tdigest + + /** implicitly unpack a TDigestArraySQL to extract its Array[TDigest] payload */ implicit def implicitTDigestArraySQLToTDigestArray(tdasql: TDigestArraySQL): Array[TDigest] = tdasql.tdigests + /** For declaring implicit values that map numeric types to corresponding DataType values */ case class TDigestUDAFDataType[N](tpe: DataType) implicit val tDigestUDAFDataTypeByte = TDigestUDAFDataType[Byte](ByteType)