Skip to content

Commit

Permalink
documentation for 0.1.0 (#2)
Browse files Browse the repository at this point in the history
* scaladoc

* fix bug in TDigestArrayUDAF, forgot to re-store updated buffer

* version 0.1.0

* t-digest UDAF examples
  • Loading branch information
erikerlandson authored Jul 4, 2017
1 parent 41f6da1 commit 00a7c3f
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 3 deletions.
141 changes: 141 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,143 @@
# isarn-sketches-spark
Routines and data structures for using isarn-sketches idiomatically in Apache Spark

## API documentation
https://isarn.github.io/isarn-sketches-spark/latest/api/#org.isarnproject.sketches.udaf.package

## How to use in your project

#### sbt
``` scala
resolvers += "isarn project" at "https://dl.bintray.com/isarn/maven/"

libraryDependencies += "org.isarnproject" %% "isarn-sketches-spark" % "0.1.0"
```

#### maven
``` xml
<dependency>
<groupId>org.isarnproject</groupId>
<artifactId>isarn-sketches-spark_2.10</artifactId>
<version>0.1.0</version>
<type>pom</type>
</dependency>
```

## Examples

### Sketch a numeric column
```scala
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._

scala> import scala.util.Random.nextGaussian
import scala.util.Random.nextGaussian

scala> val data = sc.parallelize(Vector.fill(1000){(nextGaussian, nextGaussian)}).toDF.as[(Double, Double)]
data: org.apache.spark.sql.Dataset[(Double, Double)] = [_1: double, _2: double]

scala> val udaf = tdigestUDAF[Double].delta(0.2).maxDiscrete(25)
udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.2,25)

scala> val agg = data.agg(udaf($"_1"), udaf($"_2"))
agg: org.apache.spark.sql.DataFrame = [tdigestudaf(_1): tdigest, tdigestudaf(_2): tdigest]

scala> val (td1, td2) = (agg.first.getAs[TDigestSQL](0).tdigest, agg.first.getAs[TDigestSQL](1).tdigest)
td1: org.isarnproject.sketches.TDigest = TDigest(0.2,25,151,TDigestMap(-3.1241237514093707 -> (1.0, 1.0), ...

scala> td1.cdf(0)
res1: Double = 0.5159531867457404

scala> td2.cdf(0)
res2: Double = 0.504233763693618
```

### Sketch a numeric array column
```scala
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._

scala> import scala.util.Random._
import scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vector.fill(5){nextGaussian})})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: array<double>]

scala> val udaf1 = tdigestUDAF[Int].maxDiscrete(20)
udaf1: org.isarnproject.sketches.udaf.TDigestUDAF[Int] = TDigestUDAF(0.5,20)

scala> val udafA = tdigestArrayUDAF[Double]
udafA: org.isarnproject.sketches.udaf.TDigestArrayUDAF[Double] = TDigestArrayUDAF(0.5,0)

scala> val (first1, firstA) = (data.agg(udaf1($"_1")).first, data.agg(udafA($"_2")).first)
first1: org.apache.spark.sql.Row = [TDigestSQL(TDigest(0.5,20,19,TDigestMap(-9.0 -> (51.0, 51.0),...
firstA: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@782b0d37)]

scala> val sample1 = Vector.fill(10) { first1.getAs[TDigestSQL](0).tdigest.sample }
sample1: scala.collection.immutable.Vector[Double] = Vector(0.0, 7.0, 9.0, 6.0, 1.0, 3.0, 4.0, 0.0, 9.0, 0.0)

scala> val sampleA = firstA.getAs[TDigestArraySQL](0).tdigests.map(_.sample)
sampleA: Array[Double] = Array(0.5079398036724695, 0.7518583956493221, -0.054376728126603546, 0.7141623682043323, 0.4788564991204228)
```

### Sketch a column of ML Vector
```scala
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._

scala> import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vectors

scala> import scala.util.Random._
import scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]

scala> val udafV = tdigestMLVecUDAF
udafV: org.isarnproject.sketches.udaf.TDigestMLVecUDAF = TDigestMLVecUDAF(0.5,0)

scala> val firstV = data.agg(udafV($"_2")).first
firstV: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@42b579cd)]

scala> val sampleV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.sample)
sampleV: Array[Double] = Array(1.815862652134914, 0.24668895676164276, 0.09236479932949887)

scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5))
medianV: Array[Double] = Array(-0.049806905959001196, -0.08528817932077674, -0.05291800642695017)
```

### Sketch a column of MLLib Vector
```scala
scala> import org.isarnproject.sketches._, org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> import scala.util.Random._
import scala.util.Random._

scala> val data = spark.createDataFrame(Vector.fill(1000){(nextInt(10), Vectors.dense(nextGaussian,nextGaussian,nextGaussian))})
data: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]

scala> val udafV = tdigestMLLibVecUDAF
udafV: org.isarnproject.sketches.udaf.TDigestMLLibVecUDAF = TDigestMLLibVecUDAF(0.5,0)

scala> val firstV = data.agg(udafV($"_2")).first
firstV: org.apache.spark.sql.Row = [TDigestArraySQL([Lorg.isarnproject.sketches.TDigest;@6bffea90)]

scala> val sampleV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.sample)
sampleV: Array[Double] = Array(0.10298190759496548, -0.1968752746464183, -1.0139250851274562)

scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5))
medianV: Array[Double] = Array(0.025820266848484798, 0.01951778217339037, 0.09701138847692858)
```
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ organization := "org.isarnproject"

bintrayOrganization := Some("isarn")

version := "0.1.0.rc1"
version := "0.1.0"

scalaVersion := "2.11.8"

Expand All @@ -31,7 +31,7 @@ def commonSettings = Seq(
|import org.apache.spark.isarnproject.sketches.udt._
|val initialConf = new SparkConf().setAppName("repl").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "16mb")
|val spark = SparkSession.builder.config(initialConf).master("local[2]").getOrCreate()
|import spark._
|import spark._, spark.implicits._
|val sc = spark.sparkContext
|import org.apache.log4j.{Logger, ConsoleAppender, Level}
|Logger.getRootLogger().getAppender("console").asInstanceOf[ConsoleAppender].setThreshold(Level.WARN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,20 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArra
import org.isarnproject.sketches.TDigest
import org.isarnproject.sketches.tdmap.TDigestMap

/** A type for receiving the results of deserializing [[TDigestUDT]].
* The payload is the tdigest member field, holding a TDigest object.
* This is necessary because (a) I define the TDigest type is defined in the isarn-sketches
* package and I do not
* want any Spark dependencies on that package, and (b) Spark has privatized UserDefinedType under
* org.apache.spark scope, and so I have to have a paired result type in the same scope.
* @param tdigest The TDigest payload, which does the actual sketching.
*/
@SQLUserDefinedType(udt = classOf[TDigestUDT])
case class TDigestSQL(tdigest: TDigest)

/** A UserDefinedType for serializing and deserializing [[TDigestSQL]] structures during UDAF
* aggregations.
*/
class TDigestUDT extends UserDefinedType[TDigestSQL] {
def userClass: Class[TDigestSQL] = classOf[TDigestSQL]

Expand Down Expand Up @@ -66,11 +77,21 @@ class TDigestUDT extends UserDefinedType[TDigestSQL] {
}
}

/** Instantiated instance of [[TDigestUDT]] for use by UDAF objects */
case object TDigestUDT extends TDigestUDT

/** A type for receiving the results of deserializing [[TDigestArrayUDT]].
* The payload is the tdigests member field, holding an Array of TDigest objects.
* @param tdigests An array of TDigest objects, which do the actual sketching.
* @see [[TDigestSQL]] for additional context
*/
@SQLUserDefinedType(udt = classOf[TDigestArrayUDT])
case class TDigestArraySQL(tdigests: Array[TDigest])

/**
* A UserDefinedType for serializing and deserializing [[TDigestArraySQL]] objects
* during UDAF aggregations
*/
class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] {
def userClass: Class[TDigestArraySQL] = classOf[TDigestArraySQL]

Expand Down Expand Up @@ -124,9 +145,10 @@ class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] {
}
}

/** A [[TDigestArrayUDT]] instance for use in declaring UDAF objects */
case object TDigestArrayUDT extends TDigestArrayUDT

// VectorUDT is private[spark], but I can expose what I need this way:
/** Shims for exposing Spark's VectorUDT objects outside of org.apache.spark scope */
object TDigestUDTInfra {
private val udtML = new org.apache.spark.ml.linalg.VectorUDT
def udtVectorML: DataType = udtML
Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,21 @@ import org.isarnproject.sketches.TDigest

import org.apache.spark.isarnproject.sketches.udt._

/**
* A UDAF for sketching numeric data with a TDigest.
* Expected to be created using [[tdigestUDAF]].
* @tparam N the expected numeric type of the data; Double, Int, etc
* @param deltaV The delta value to be used by the TDigest object
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest object
*/
case class TDigestUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
num: Numeric[N],
dataTpe: TDigestUDAFDataType[N]) extends UserDefinedAggregateFunction {

/** customize the delta value to be used by the TDigest object */
def delta(deltaP: Double) = this.copy(deltaV = deltaP)

/** customize the maxDiscrete value to be used by the TDigest object */
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)

// A t-digest is deterministic, but it is only statistically associative or commutative
Expand Down Expand Up @@ -59,6 +68,7 @@ case class TDigestUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
def evaluate(buf: Row): Any = buf.getAs[TDigestSQL](0)
}

/** A base class that defines the common functionality for array sketching UDAFs */
abstract class TDigestMultiUDAF extends UserDefinedAggregateFunction {
def deltaV: Double
def maxDiscreteV: Int
Expand Down Expand Up @@ -90,11 +100,19 @@ abstract class TDigestMultiUDAF extends UserDefinedAggregateFunction {
def evaluate(buf: Row): Any = buf.getAs[TDigestArraySQL](0)
}

/**
* A UDAF for sketching a column of ML Vectors with an array of TDigest objects.
* Expected to be created using [[tdigestMLVecUDAF]].
* @param deltaV The delta value to be used by the TDigest object
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest object
*/
case class TDigestMLVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMultiUDAF {
import org.apache.spark.ml.linalg.{ Vector => Vec }

/** customize the delta value to be used by the TDigest object */
def delta(deltaP: Double) = this.copy(deltaV = deltaP)

/** customize the maxDiscrete value to be used by the TDigest object */
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)

def inputSchema: StructType = StructType(StructField("vector", TDigestUDTInfra.udtVectorML) :: Nil)
Expand Down Expand Up @@ -124,11 +142,19 @@ case class TDigestMLVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMu
}
}

/**
* A UDAF for sketching a column of MLLib Vectors with an array of TDigest objects.
* Expected to be created using [[tdigestMLLibVecUDAF]].
* @param deltaV The delta value to be used by the TDigest object
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest object
*/
case class TDigestMLLibVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDigestMultiUDAF {
import org.apache.spark.mllib.linalg.{ Vector => Vec }

/** customize the delta value to be used by the TDigest object */
def delta(deltaP: Double) = this.copy(deltaV = deltaP)

/** customize the maxDiscrete value to be used by the TDigest object */
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)

def inputSchema: StructType =
Expand Down Expand Up @@ -159,12 +185,21 @@ case class TDigestMLLibVecUDAF(deltaV: Double, maxDiscreteV: Int) extends TDiges
}
}

/**
* A UDAF for sketching a column of numeric ArrayData with an array of TDigest objects.
* Expected to be created using [[tdigestArrayUDAF]].
* @tparam N the expected numeric type of the data; Double, Int, etc
* @param deltaV The delta value to be used by the TDigest objects
* @param maxDiscreteV The maxDiscrete value to be used by the TDigest objects
*/
case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
num: Numeric[N],
dataTpe: TDigestUDAFDataType[N]) extends TDigestMultiUDAF {

/** customize the delta value to be used by the TDigest object */
def delta(deltaP: Double) = this.copy(deltaV = deltaP)

/** customize the maxDiscrete value to be used by the TDigest object */
def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP)

def inputSchema: StructType =
Expand All @@ -183,6 +218,7 @@ case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit
if (x != null) tdigests(j) += num.toDouble(x)
j += 1
}
buf(0) = TDigestArraySQL(tdigests)
}
}
}
Loading

0 comments on commit 00a7c3f

Please sign in to comment.