From 16af0ef730cfc4f430bd63733022e7eb16872c47 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Sat, 2 Sep 2017 13:35:52 -0700 Subject: [PATCH] pyspark support (#5) * assembly * test some objects consumable from pyspark * experimenting with pyc files in jar * a working prototype of scala & python in same maven jar file * draft of python-side TDigestUDT and TDigest * cdf method for python TDigest * cdfInverse, also some doc * comment * cdfDiscrete, cdfDiscreteInverse, and sampling methods * Restore pyc in jars, clean up python bindings for UDAFs * fill in some missing method overrides for TDigestArrayUDT * fix typo tdigestFloatUDAF * TDigestArrayUDT for python * T-Digest reducing UDAFs * add some doc * document reducing variants * bump version, add sonatype publishing, py/spark version suffixes * python examples for readme --- README.md | 205 +++++++++++ build.sbt | 105 +++++- project/assembly.sbt | 1 + project/plugins.sbt | 4 + python/isarnproject/__init__.py | 0 python/isarnproject/sketches/__init__.py | 0 python/isarnproject/sketches/udaf/__init__.py | 0 python/isarnproject/sketches/udaf/tdigest.py | 176 ++++++++++ python/isarnproject/sketches/udt/__init__.py | 0 python/isarnproject/sketches/udt/tdigest.py | 322 ++++++++++++++++++ .../sketches/udt/TDigestUDT.scala | 30 ++ .../sketches/udaf/TDigestUDAF.scala | 123 +++++++ .../isarnproject/sketches/udaf/package.scala | 60 ++++ 13 files changed, 1020 insertions(+), 6 deletions(-) create mode 100644 project/assembly.sbt create mode 100644 python/isarnproject/__init__.py create mode 100644 python/isarnproject/sketches/__init__.py create mode 100644 python/isarnproject/sketches/udaf/__init__.py create mode 100644 python/isarnproject/sketches/udaf/tdigest.py create mode 100644 python/isarnproject/sketches/udt/__init__.py create mode 100644 python/isarnproject/sketches/udt/tdigest.py diff --git a/README.md b/README.md index 58b2164..111aaae 100644 --- a/README.md +++ b/README.md @@ -141,3 +141,208 @@ sampleV: Array[Double] = Array(0.10298190759496548, -0.1968752746464183, -1.0139 scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5)) medianV: Array[Double] = Array(0.025820266848484798, 0.01951778217339037, 0.09701138847692858) ``` + +### Reduce a column (or grouping) of T-Digests +```scala +scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._, org.isarnproject.sketches._, scala.util.Random._ +import org.isarnproject.sketches.udaf._ +import org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches._ +import scala.util.Random._ + +scala> val x = sc.parallelize(Vector.fill(1000) { nextGaussian }).toDF("x") +x: org.apache.spark.sql.DataFrame = [x: double] + +scala> val g = sc.parallelize(Seq(1,2,3,4,5)).toDF("g") +g: org.apache.spark.sql.DataFrame = [g: int] + +scala> val data = g.crossJoin(x) +data: org.apache.spark.sql.DataFrame = [g: int, x: double] + +scala> val udaf = tdigestUDAF[Double] +udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.5,0) + +scala> val tds = data.groupBy("g").agg(udaf($"x").alias("tdigests")) +tds: org.apache.spark.sql.DataFrame = [g: int, tdigests: tdigest] + +scala> tds.show() ++---+--------------------+ +| g| tdigests| ++---+--------------------+ +| 1|TDigestSQL(TDiges...| +| 3|TDigestSQL(TDiges...| +| 5|TDigestSQL(TDiges...| +| 4|TDigestSQL(TDiges...| +| 2|TDigestSQL(TDiges...| ++---+--------------------+ + +scala> val td = tds.agg(tdigestReduceUDAF($"tdigests").alias("tdigest")) +td: org.apache.spark.sql.DataFrame = [tdigest: tdigest] + +scala> td.show() ++--------------------+ +| tdigest| ++--------------------+ +|TDigestSQL(TDiges...| ++--------------------+ +``` + +### Reduce a column (or grouping) of T-Digest Arrays +```scala +scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._, org.isarnproject.sketches._, scala.util.Random._ +import org.isarnproject.sketches.udaf._ +import org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches._ +import scala.util.Random._ + +scala> val x = sc.parallelize(Vector.fill(1000) { Vector.fill(3) { nextGaussian } }).toDF("x") +x: org.apache.spark.sql.DataFrame = [x: array] + +scala> val g = sc.parallelize(Seq(1,2,3,4,5)).toDF("g") +g: org.apache.spark.sql.DataFrame = [g: int] + +scala> val data = g.crossJoin(x) +data: org.apache.spark.sql.DataFrame = [g: int, x: array] + +scala> val udaf = tdigestArrayUDAF[Double] +udaf: org.isarnproject.sketches.udaf.TDigestArrayUDAF[Double] = TDigestArrayUDAF(0.5,0) + +scala> val tds = data.groupBy("g").agg(udaf($"x").alias("tdigests")) +tds: org.apache.spark.sql.DataFrame = [g: int, tdigests: tdigestarray] + +scala> tds.show() ++---+--------------------+ +| g| tdigests| ++---+--------------------+ +| 1|TDigestArraySQL([...| +| 3|TDigestArraySQL([...| +| 5|TDigestArraySQL([...| +| 4|TDigestArraySQL([...| +| 2|TDigestArraySQL([...| ++---+--------------------+ + +scala> val td = tds.agg(tdigestArrayReduceUDAF($"tdigests")) +td: org.apache.spark.sql.DataFrame = [tdigestarrayreduceudaf(tdigests): tdigestarray] + +scala> td.show() ++--------------------------------+ +|tdigestarrayreduceudaf(tdigests)| ++--------------------------------+ +| TDigestArraySQL([...| ++--------------------------------+ +``` + +### Sketch a numeric column (python) +```python +>>> from isarnproject.sketches.udaf.tdigest import * +>>> from random import gauss +>>> from pyspark.sql.types import * +>>> data = sc.parallelize([[gauss(0,1)] for x in xrange(1000)]).toDF(StructType([StructField("x", DoubleType())])) +>>> agg = data.agg(tdigestDoubleUDAF("x")) +>>> td = agg.first()[0] +>>> td.cdfInverse(0.5) +0.046805581998797419 +>>> +``` + +### Sketch a numeric array column (python) +```python +>>> from isarnproject.sketches.udaf.tdigest import * +>>> from random import gauss +>>> from pyspark.sql.types import * +>>> data = sc.parallelize([[[gauss(0,1),gauss(0,1),gauss(0,1)]] for x in xrange(1000)]).toDF(StructType([StructField("x", ArrayType(DoubleType()))])) +>>> agg = data.agg(tdigestDoubleArrayUDAF("x")) +>>> tds = agg.first()[0] +>>> [t.cdfInverse(0.5) for t in td] +[0.046116924117141189, -0.011071666930287466, -0.019006033872431105] +>>> +``` + +### Sketch a column of ML Vectors (python) +```python +>>> from isarnproject.sketches.udaf.tdigest import * +>>> from random import gauss +>>> from pyspark.ml.linalg import VectorUDT, Vectors +>>> from pyspark.sql.types import * +>>> data = sc.parallelize([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in xrange(1000)]).toDF(StructType([StructField("x", VectorUDT())])) +>>> agg = data.agg(tdigestMLVecUDAF("x")) +>>> tds = agg.first()[0] +>>> [t.cdfInverse(0.5) for t in tds] +[0.02859498787770634, -0.0027338622700039117, 0.041590980872883487] +>>> +``` + +### Sketch a column of MLLib Vectors (python) +```python +>>> from isarnproject.sketches.udaf.tdigest import * +>>> from random import gauss +>>> from pyspark.mllib.linalg import VectorUDT, Vectors +>>> from pyspark.sql.types import * +>>> data = sc.parallelize([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in xrange(1000)]).toDF(StructType([StructField("x", VectorUDT())])) +>>> agg = data.agg(tdigestMLLibVecUDAF("x")) +>>> tds = agg.first()[0] +>>> [t.cdfInverse(0.5) for t in tds] +[0.02859498787770634, -0.0027338622700039117, 0.041590980872883487] +>>> +``` + +### Reduce a column (or grouping) of T-Digests (python) +```python +>>> from isarnproject.sketches.udaf.tdigest import * +>>> from random import gauss +>>> from pyspark.sql.types import * +>>> x = sc.parallelize([[gauss(0,1)] for x in xrange(1000)]).toDF(StructType([StructField("x", DoubleType())])) +>>> g = sc.parallelize([[1+x] for x in xrange(5)]).toDF(StructType([StructField("g", IntegerType())])) +>>> data = g.crossJoin(x) +>>> tds = data.groupBy("g").agg(tdigestDoubleUDAF("x").alias("tdigests")) +>>> tds.show() ++---+--------------------+ +| g| tdigests| ++---+--------------------+ +| 1|TDigestSQL(TDiges...| +| 3|TDigestSQL(TDiges...| +| 5|TDigestSQL(TDiges...| +| 4|TDigestSQL(TDiges...| +| 2|TDigestSQL(TDiges...| ++---+--------------------+ + +>>> td = tds.agg(tdigestReduceUDAF("tdigests").alias("tdigest")) +>>> td.show() ++--------------------+ +| tdigest| ++--------------------+ +|TDigestSQL(TDiges...| ++--------------------+ + +>>> +``` + +### Reduce a column (or grouping) of T-Digest Arrays (python) +```python +>>> from isarnproject.sketches.udaf.tdigest import * +>>> from random import gauss +>>> from pyspark.ml.linalg import VectorUDT, Vectors +>>> from pyspark.sql.types import * +>>> x = sc.parallelize([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in xrange(1000)]).toDF(StructType([StructField("x", VectorUDT())])) +>>> g = sc.parallelize([[1+x] for x in xrange(5)]).toDF(StructType([StructField("g", IntegerType())])) +>>> data = g.crossJoin(x) +>>> tds = data.groupBy("g").agg(tdigestMLVecUDAF("x").alias("tdigests")) +>>> tds.show() ++---+--------------------+ +| g| tdigests| ++---+--------------------+ +| 1|TDigestArraySQL([...| +| 3|TDigestArraySQL([...| +| 5|TDigestArraySQL([...| +| 4|TDigestArraySQL([...| +| 2|TDigestArraySQL([...| ++---+--------------------+ + +>>> td = tds.agg(tdigestArrayReduceUDAF("tdigests").alias("tdigest")) +>>> td.show() ++--------------------+ +| tdigest| ++--------------------+ +|TDigestArraySQL([...| ++--------------------+ +``` diff --git a/build.sbt b/build.sbt index e64dfcf..46517ce 100644 --- a/build.sbt +++ b/build.sbt @@ -4,19 +4,65 @@ organization := "org.isarnproject" bintrayOrganization := Some("isarn") -version := "0.1.0" +val packageVersion = "0.2.0" + +val sparkVersion = "2.2.0" + +val pythonVersion = "2.7" + +val sparkSuffix = s"""sp${sparkVersion.split('.').take(2).mkString(".")}""" + +val pythonSuffix = s"""py${pythonVersion.split('.').take(2).mkString(".")}""" + +val pythonCMD = s"""python${pythonVersion.split('.').head}""" + +version := s"${packageVersion}-${sparkSuffix}-${pythonSuffix}" scalaVersion := "2.11.8" crossScalaVersions := Seq("2.10.6", "2.11.8") +useGpg := true + +pomIncludeRepository := { _ => false } + +publishMavenStyle := true + +publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) + Some("snapshots" at nexus + "content/repositories/snapshots") + else + Some("releases" at nexus + "service/local/staging/deploy/maven2") +} + +licenses += ("Apache-2.0", url("http://opensource.org/licenses/Apache-2.0")) + +homepage := Some(url("https://github.com/isarn/isarn-sketches-spark")) + +scmInfo := Some( + ScmInfo( + url("https://github.com/isarn/isarn-sketches-spark"), + "scm:git@github.com:isarn/isarn-sketches-spark.git" + ) +) + +developers := List( + Developer( + id = "erikerlandson", + name = "Erik Erlandson", + email = "eje@redhat.com", + url = url("https://erikerlandson.github.io/") + ) +) + def commonSettings = Seq( libraryDependencies ++= Seq( - "org.isarnproject" %% "isarn-sketches" % "0.1.0", - "org.apache.spark" %% "spark-core" % "2.1.0" % Provided, - "org.apache.spark" %% "spark-sql" % "2.1.0" % Provided, - "org.apache.spark" %% "spark-mllib" % "2.1.0" % Provided, - "org.isarnproject" %% "isarn-scalatest" % "0.0.1" % Test, + "org.isarnproject" %% "isarn-sketches" % "0.1.1", + "org.apache.spark" %% "spark-core" % sparkVersion % Provided, + "org.apache.spark" %% "spark-sql" % sparkVersion % Provided, + "org.apache.spark" %% "spark-mllib" % sparkVersion % Provided, + "org.isarnproject" %% "isarn-scalatest" % "0.0.2" % Test, "org.scalatest" %% "scalatest" % "2.2.4" % Test, "org.apache.commons" % "commons-math3" % "3.6.1" % Test), initialCommands in console := """ @@ -45,6 +91,53 @@ licenses += ("Apache-2.0", url("http://opensource.org/licenses/Apache-2.0")) scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature") +lazy val deletePYC = taskKey[Unit]("Delete .pyc files") + +deletePYC := { + val s: TaskStreams = streams.value + s.log.info("delete .pyc files...") + val cmd = "bash" :: "-c" :: "rm -f $(find python -name *.pyc)" :: Nil + val stat = (cmd !) + if (stat == 0) { + s.log.info("delete .pyc succeeded") + } else { + throw new IllegalStateException("delete .pyc failed") + } +} + +lazy val compilePython = taskKey[Unit]("Compile python files") + +compilePython := { + val s: TaskStreams = streams.value + s.log.info("compiling python...") + val stat = (Seq(pythonCMD, "-m", "compileall", "python/") !) + if (stat == 0) { + s.log.info("python compile succeeded") + } else { + throw new IllegalStateException("python compile failed") + } +} + +compilePython <<= compilePython.dependsOn(deletePYC) + +(packageBin in Compile) <<= (packageBin in Compile).dependsOn(compilePython) + +mappings in (Compile, packageBin) ++= Seq( + (baseDirectory.value / "python" / "isarnproject" / "__init__.pyc") -> "isarnproject/__init__.pyc", + (baseDirectory.value / "python" / "isarnproject" / "sketches" / "__init__.pyc") -> "isarnproject/sketches/__init__.pyc", + (baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "__init__.pyc") -> "isarnproject/sketches/udaf/__init__.pyc", + (baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "tdigest.pyc") -> "isarnproject/sketches/udaf/tdigest.pyc", + (baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "__init__.pyc") -> "isarnproject/sketches/udt/__init__.pyc", + (baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "tdigest.pyc") -> "isarnproject/sketches/udt/tdigest.pyc" +) + +test in assembly := {} + +assemblyShadeRules in assembly := Seq( + ShadeRule.zap("scala.**").inAll, + ShadeRule.zap("org.slf4j.**").inAll +) + scalacOptions in (Compile, doc) ++= Seq("-doc-root-content", baseDirectory.value+"/root-doc.txt") site.settings diff --git a/project/assembly.sbt b/project/assembly.sbt new file mode 100644 index 0000000..e17409e --- /dev/null +++ b/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") diff --git a/project/plugins.sbt b/project/plugins.sbt index cbbf12b..4a0e171 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -13,6 +13,10 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.6.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.1") + +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.0") + // scoverage and coveralls deps are at old versions to avoid a bug in the current versions // update these when this fix is released: https://github.com/scoverage/sbt-coveralls/issues/73 addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4") diff --git a/python/isarnproject/__init__.py b/python/isarnproject/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/isarnproject/sketches/__init__.py b/python/isarnproject/sketches/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/isarnproject/sketches/udaf/__init__.py b/python/isarnproject/sketches/udaf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/isarnproject/sketches/udaf/tdigest.py b/python/isarnproject/sketches/udaf/tdigest.py new file mode 100644 index 0000000..11b69e3 --- /dev/null +++ b/python/isarnproject/sketches/udaf/tdigest.py @@ -0,0 +1,176 @@ +from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.context import SparkContext + +__all__ = ['tdigestIntUDAF', 'tdigestLongUDAF', 'tdigestFloatUDAF', 'tdigestDoubleUDAF', \ + 'tdigestMLVecUDAF', 'tdigestMLLibVecUDAF', \ + 'tdigestIntArrayUDAF', 'tdigestLongArrayUDAF', \ + 'tdigestFloatArrayUDAF', 'tdigestDoubleArrayUDAF', \ + 'tdigestReduceUDAF', 'tdigestArrayReduceUDAF'] + +def tdigestIntUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of integer data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestIntUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestLongUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of long integer data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestLongUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestFloatUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of (single precision) float data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestFloatUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestDoubleUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of double float data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestDoubleUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestMLVecUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of ML Vector data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestMLVecUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestMLLibVecUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of MLLib Vector data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestMLLibVecUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestIntArrayUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of integer-array data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestIntArrayUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestLongArrayUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of long-integer array data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestLongArrayUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestFloatArrayUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of (single-precision) float array data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestFloatArrayUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestDoubleArrayUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of double array data. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestDoubleArrayUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestReduceUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of t-digests. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestReduceUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) + +def tdigestArrayReduceUDAF(col, delta=0.5, maxDiscrete=0): + """ + Return a UDAF for aggregating a column of t-digest vectors. + + :param col: name of the column to aggregate + :param delta: T-Digest compression parameter (default 0.5) + :param maxDiscrete: maximum unique discrete values to store before reverting to + continuous (default 0) + """ + sc = SparkContext._active_spark_context + tdapply = sc._jvm.org.isarnproject.sketches.udaf.pythonBindings.tdigestArrayReduceUDAF( \ + delta, maxDiscrete).apply + return Column(tdapply(_to_seq(sc, [col], _to_java_column))) diff --git a/python/isarnproject/sketches/udt/__init__.py b/python/isarnproject/sketches/udt/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/isarnproject/sketches/udt/tdigest.py b/python/isarnproject/sketches/udt/tdigest.py new file mode 100644 index 0000000..c523fc5 --- /dev/null +++ b/python/isarnproject/sketches/udt/tdigest.py @@ -0,0 +1,322 @@ + +import sys +import random + +import numpy as np + +from pyspark.sql.types import UserDefinedType, StructField, StructType, \ + ArrayType, DoubleType, IntegerType + +__all__ = ['TDigest'] + +class TDigestUDT(UserDefinedType): + @classmethod + def sqlType(cls): + return StructType([ + StructField("delta", DoubleType(), False), + StructField("maxDiscrete", IntegerType(), False), + StructField("nclusters", IntegerType(), False), + StructField("clustX", ArrayType(DoubleType(), False), False), + StructField("clustM", ArrayType(DoubleType(), False), False)]) + + @classmethod + def module(cls): + return "isarnproject.sketches.udt.tdigest" + + @classmethod + def scalaUDT(cls): + return "org.apache.spark.isarnproject.sketches.udt.TDigestUDT" + + def simpleString(self): + return "tdigest" + + def serialize(self, obj): + if isinstance(obj, TDigest): + return (obj.delta, obj.maxDiscrete, obj.nclusters, \ + [float(v) for v in obj.clustX], \ + [float(v) for v in obj.clustM]) + else: + raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) + + def deserialize(self, datum): + return TDigest(datum[0], datum[1], datum[2], datum[3], datum[4]) + +class TDigestArrayUDT(UserDefinedType): + @classmethod + def sqlType(cls): + return StructType([ + StructField("delta", DoubleType(), False), + StructField("maxDiscrete", IntegerType(), False), + StructField("clusterS", ArrayType(IntegerType(), False), False), + StructField("clusterX", ArrayType(DoubleType(), False), False), + StructField("clusterM", ArrayType(DoubleType(), False), False)]) + + @classmethod + def module(cls): + return "isarnproject.sketches.udt.tdigest" + + @classmethod + def scalaUDT(cls): + return "org.apache.spark.isarnproject.sketches.udt.TDigestArrayUDT" + + def simpleString(self): + return "tdigestarray" + + def serialize(self, obj): + if isinstance(obj, TDigestList): + clustS = [] + clustX = [] + clustM = [] + for td in obj: + clustS.append(td.nclusters) + clustX += [float(v) for v in td.clustX] + clustM += [float(v) for v in td.clustM] + delta = obj[0].delta if len(obj) > 0 else 0.5 + maxDiscrete = obj[0].maxDiscrete if len(obj) > 0 else 0 + return (delta, maxDiscrete, clustS, clustX, clustM) + else: + raise TypeError("cannot serialize %r of type %r" % (obj, type(obj))) + + def deserialize(self, datum): + delta = datum[0] + maxDiscrete = datum[1] + clusterS = datum[2] + clusterX = datum[3] + clusterM = datum[4] + tdlist = TDigestList() + b = 0 + for s in clusterS: + clustX = clusterX[b:b+s] + clustM = clusterM[b:b+s] + tdlist.append(TDigest(delta, maxDiscrete, s, clustX, clustM)) + b += s + return tdlist + +class TDigestList(list): + """ + A subclass of a list of TDigest objects, deserialized from a Dataset. + This subclass has a __UDT__ element + """ + + __UDT__ = TDigestArrayUDT() + +class TDigest(object): + """ + A T-Digest sketch of a cumulative numeric distribution. + This is a "read-only" python mirror of org.isarnproject.sketches.TDigest which supports + all cdf and sampling methods, but does not currently support update with new data. It is + assumed to have been produced with a t-digest UDAF, also exposed in this package. + """ + + # Because this is a value and not a function, TDigestUDT has to be defined above, + # and in the same file. + __UDT__ = TDigestUDT() + + def __init__(self, delta, maxDiscrete, nclusters, clustX, clustM): + self.delta = float(delta) + self.maxDiscrete = int(maxDiscrete) + self.nclusters = int(nclusters) + self.clustX = np.array(clustX, dtype=np.float64) + self.clustM = np.array(clustM, dtype=np.float64) + self.clustP = np.cumsum(clustM) + assert len(self.clustX) == self.nclusters, "nclusters does not match cluster mass array" + assert len(self.clustX) == len(self.clustM), "cluster mass array does not match cluster center array" + + def __repr__(self): + return "TDigest(%s, %s, %s, %s, %s)" % \ + (repr(self.delta), repr(self.maxDiscrete), repr(self.nclusters), repr(self.clustX), repr(self.clustM)) + + def mass(self): + """ + Total mass accumulated by this TDigest + """ + if len(self.clustP) == 0: + return 0.0 + return self.clustP[-1] + + def isEmpty(self): + """ + Returns True if this TDigest is empty, False otherwise + """ + return len(self.clustX) == 0 + + def __reduce__(self): + return (self.__class__, (self.delta, self.maxDiscrete. self.nclusters, self.clustX, self.clustM, )) + + # The "right cover" of a value x, w.r.t. the clusters in this TDigest + def __covR__(self, x): + n = len(self.clustX) + if n == 0: + return Cover(None, None) + j = np.searchsorted(self.clustX, x, side='right') + if j == n: + return Cover(n - 1, None) + if x < self.clustX[0]: + return Cover(None, 0) + return Cover(j - 1, j) + + # The "mass cover" of a mass m, w.r.t. the clusters in this TDigest + def __covM__(self, m): + n = len(self.clustP) + if n == 0: + return Cover(None, None) + j = np.searchsorted(self.clustP, m, side='right') + if j == n: + return Cover(n - 1, None) + if m < self.clustP[0]: + return Cover(None, 0) + return Cover(j - 1, j) + + # Get a "corrected mass" for two clusters. + # Returns "centered" mass for interior clusters, and uncentered for edge clusters. + def __m1m2__(self, j, c1, tm1, c2, tm2): + assert len(self.clustX) > 0, "unexpected empty TDigest" + # s is the "open" prefix sum, for x < c1 + s = self.clustP[j] - tm1 + d1 = 0.0 if c1 == self.clustX[0] else tm1 / 2.0 + d2 = tm2 if c2 == self.clustX[-1] else tm2 / 2.0 + m1 = s + d1 + m2 = m1 + (tm1 - d1) + d2 + return (m1, m2) + + # Get the inverse CDF, using the "corrected mass" + def __cdfI__(self, j, m, c1, tm1, c2, tm2): + m1, m2 = self.__m1m2__(j, c1, tm1, c2, tm2) + return c1 + (m - m1) * (c2 - c1) / (m2 - m1) + + def cdf(self, xx): + """ + Return CDF(x) of a numeric value x, with respect to this TDigest CDF sketch. + """ + x = float(xx) + jcov = self.__covR__(x) + cov = jcov.map(lambda j: (self.clustX[j], self.clustM[j])) + if (not cov.l.isEmpty()) and (not cov.r.isEmpty()): + c1, tm1 = cov.l.get() + c2, tm2 = cov.r.get() + m1, m2 = self.__m1m2__(jcov.l.get(), c1, tm1, c2, tm2) + return (m1 + (x - c1) * (m2 - m1) / (c2 - c1)) / self.mass() + if cov.r.isEmpty(): + return 1.0 + return 0.0 + + def cdfInverse(self, qq): + """ + Given a value q on [0,1], return the value x such that CDF(x) = q. + Returns NaN for any q > 1 or < 0, or if this TDigest is empty. + """ + q = float(qq) + if (q < 0.0) or (q > 1.0): + return float('nan') + if self.isEmpty(): + return float('nan') + jcov = self.__covM__(q * self.mass()) + cov = jcov.map(lambda j: (self.clustX[j], self.clustM[j])) + m = q * self.mass() + if (not cov.l.isEmpty()) and (not cov.r.isEmpty()): + c1, tm1 = cov.l.get() + c2, tm2 = cov.r.get() + return self.__cdfI__(jcov.l.get(), m, c1, tm1, c2, tm2) + if not cov.r.isEmpty(): + c = cov.r.get()[0] + jcovR = self.__covR__(c) + covR = jcovR.map(lambda j: (self.clustX[j], self.clustM[j])) + if (not covR.l.isEmpty()) and (not covR.r.isEmpty()): + c1, tm1 = covR.l.get() + c2, tm2 = covR.r.get() + return self.__cdfI__(jcovR.l.get(), m, c1, tm1, c2, tm2) + return float('nan') + if not cov.l.isEmpty(): + c = cov.l.get()[0] + return c + return float('nan') + + def cdfDiscrete(self, xx): + """ + return CDF(x) for a numeric value x, assuming the sketch is representing a + discrete distribution. + """ + if self.isEmpty(): + return 0.0 + j = np.searchsorted(self.clustX, float(xx), side='right') + if (j == 0): + return 0.0 + return self.clustP[j - 1] / self.mass() + + def cdfDiscreteInverse(self, qq): + """ + Given a value q on [0,1], return the value x such that CDF(x) = q, assuming + the sketch is represenging a discrete distribution. + Returns NaN for any q > 1 or < 0, or if this TDigest is empty. + """ + q = float(qq) + if (q < 0.0) or (q > 1.0): + return float('nan') + if self.isEmpty(): + return float('nan') + j = np.searchsorted(self.clustP, q * self.mass(), side='left') + return self.clustX[j] + + def samplePDF(self): + """ + Return a random sampling from the sketched distribution, using inverse + transform sampling, assuming a continuous distribution. + """ + return self.cdfInverse(random.random()) + + def samplePMF(self): + """ + Return a random sampling from the sketched distribution, using inverse + transform sampling, assuming a discrete distribution. + """ + return self.cdfDiscreteInverse(random.random()) + + def sample(self): + """ + Return a random sampling from the sketched distribution, using inverse + transform sampling, assuming a discrete distribution if the number of + TDigest clusters is <= maxDiscrete, and a continuous distribution otherwise. + """ + if self.maxDiscrete <= self.nclusters: + return self.cdfDiscreteInverse(random.random()) + return self.cdfInverse(random.random()) + +class Cover(object): + """ + Analog of org.isarnproject.collections.mixmaps.nearest.Cover[Any]. + Not intended for public consumption. + """ + def __repr__(self): + return "Cover(%s, %s)" % (repr(self.l), repr(self.r)) + + def __init__(self, l, r): + self.l = Option(l) + self.r = Option(r) + + def map(self, f): + assert hasattr(f, '__call__'), "f must be callable" + return Cover(self.l.map(f).value, self.r.map(f).value) + +class Option(object): + """ + Analog of Scala Option[Any]. I only implemented the methods I need. + Not intented for public consumption. + """ + def __repr__(self): + return "Option(%s)" % (repr(self.value)) + + def __init__(self, v): + self.value = v + + def get(self): + assert self.value is not None, "Opt value was None" + return self.value + + def map(self, f): + assert hasattr(f, '__call__'), "f must be callable" + if self.value is None: + return Option(None) + return Option(f(self.value)) + + def isEmpty(self): + return self.value is None diff --git a/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala b/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala index 267bd24..de1e1ce 100644 --- a/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala +++ b/src/main/scala/org/apache/spark/isarnproject/sketches/udt/TDigestUDT.scala @@ -37,6 +37,21 @@ case class TDigestSQL(tdigest: TDigest) class TDigestUDT extends UserDefinedType[TDigestSQL] { def userClass: Class[TDigestSQL] = classOf[TDigestSQL] + override def pyUDT: String = "isarnproject.sketches.udt.tdigest.TDigestUDT" + + override def typeName: String = "tdigest" + + override def equals(obj: Any): Boolean = { + obj match { + case _: TDigestUDT => true + case _ => false + } + } + + override def hashCode(): Int = classOf[TDigestUDT].getName.hashCode() + + private[spark] override def asNullable: TDigestUDT = this + def sqlType: DataType = StructType( StructField("delta", DoubleType, false) :: StructField("maxDiscrete", IntegerType, false) :: @@ -95,6 +110,21 @@ case class TDigestArraySQL(tdigests: Array[TDigest]) class TDigestArrayUDT extends UserDefinedType[TDigestArraySQL] { def userClass: Class[TDigestArraySQL] = classOf[TDigestArraySQL] + override def pyUDT: String = "isarnproject.sketches.udt.tdigest.TDigestArrayUDT" + + override def typeName: String = "tdigestarray" + + override def equals(obj: Any): Boolean = { + obj match { + case _: TDigestArrayUDT => true + case _ => false + } + } + + override def hashCode(): Int = classOf[TDigestArrayUDT].getName.hashCode() + + private[spark] override def asNullable: TDigestArrayUDT = this + // Spark seems to have trouble with ArrayType data that isn't // serialized using UnsafeArrayData (SPARK-21277), so my workaround // is to store all the cluster information flattened into single Unsafe arrays. diff --git a/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala b/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala index 3bb4bd7..84019ae 100644 --- a/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala +++ b/src/main/scala/org/isarnproject/sketches/udaf/TDigestUDAF.scala @@ -222,3 +222,126 @@ case class TDigestArrayUDAF[N](deltaV: Double, maxDiscreteV: Int)(implicit } } } + +/** + * A UDAF for aggregating (reducing) a column of t-digests. + * Expected to be created using [[tdigestReduceUDAF]]. + * @param deltaV The delta value to be used by the TDigest objects + * @param maxDiscreteV The maxDiscrete value to be used by the TDigest objects + */ +case class TDigestReduceUDAF(deltaV: Double, maxDiscreteV: Int) extends + UserDefinedAggregateFunction { + + /** customize the delta value to be used by the TDigest object */ + def delta(deltaP: Double) = this.copy(deltaV = deltaP) + + /** customize the maxDiscrete value to be used by the TDigest object */ + def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP) + + def deterministic: Boolean = false + + def inputSchema: StructType = StructType(StructField("tdigest", TDigestUDT) :: Nil) + + def bufferSchema: StructType = StructType(StructField("tdigest", TDigestUDT) :: Nil) + + def dataType: DataType = TDigestUDT + + def initialize(buf: MutableAggregationBuffer): Unit = { + buf(0) = TDigestSQL(TDigest.empty(deltaV, maxDiscreteV)) + } + + def update(buf: MutableAggregationBuffer, input: Row): Unit = this.merge(buf, input) + + def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = { + if (!buf2.isNullAt(0)) { + buf1(0) = TDigestSQL(buf1.getAs[TDigestSQL](0).tdigest ++ buf2.getAs[TDigestSQL](0).tdigest) + } + } + + def evaluate(buf: Row): Any = buf.getAs[TDigestSQL](0) +} + +/** + * A UDAF for aggregating (reducing) a column of t-digest vectors. + * Expected to be created using [[tdigestArrayReduceUDAF]]. + * @param deltaV The delta value to be used by the TDigest objects + * @param maxDiscreteV The maxDiscrete value to be used by the TDigest objects + */ +case class TDigestArrayReduceUDAF(deltaV: Double, maxDiscreteV: Int) extends + UserDefinedAggregateFunction { + + /** customize the delta value to be used by the TDigest object */ + def delta(deltaP: Double) = this.copy(deltaV = deltaP) + + /** customize the maxDiscrete value to be used by the TDigest object */ + def maxDiscrete(maxDiscreteP: Int) = this.copy(maxDiscreteV = maxDiscreteP) + + def deterministic: Boolean = false + + def inputSchema: StructType = StructType(StructField("tdigests", TDigestArrayUDT) :: Nil) + + def bufferSchema: StructType = StructType(StructField("tdigests", TDigestArrayUDT) :: Nil) + + def dataType: DataType = TDigestArrayUDT + + def initialize(buf: MutableAggregationBuffer): Unit = { + buf(0) = TDigestArraySQL(Array.empty[TDigest]) + } + + def update(buf: MutableAggregationBuffer, input: Row): Unit = this.merge(buf, input) + + def merge(buf1: MutableAggregationBuffer, buf2: Row): Unit = { + if (!buf2.isNullAt(0)) { + val tds2 = buf2.getAs[TDigestArraySQL](0).tdigests + if (!tds2.isEmpty) { + val tdt = buf1.getAs[TDigestArraySQL](0).tdigests + val tds1 = if (!tdt.isEmpty) tdt else { + Array.fill(tds2.length) { TDigest.empty(deltaV, maxDiscreteV) } + } + require(tds1.length == tds2.length) + for { j <- 0 until tds1.length } { tds1(j) ++= tds2(j) } + buf1(0) = TDigestArraySQL(tds1) + } + } + } + + def evaluate(buf: Row): Any = buf.getAs[TDigestArraySQL](0) +} + +object pythonBindings { + def tdigestIntUDAF(delta: Double, maxDiscrete: Int) = + TDigestUDAF[Int](delta, maxDiscrete) + + def tdigestLongUDAF(delta: Double, maxDiscrete: Int) = + TDigestUDAF[Long](delta, maxDiscrete) + + def tdigestFloatUDAF(delta: Double, maxDiscrete: Int) = + TDigestUDAF[Float](delta, maxDiscrete) + + def tdigestDoubleUDAF(delta: Double, maxDiscrete: Int) = + TDigestUDAF[Double](delta, maxDiscrete) + + def tdigestMLVecUDAF(delta: Double, maxDiscrete: Int) = + TDigestMLVecUDAF(delta, maxDiscrete) + + def tdigestMLLibVecUDAF(delta: Double, maxDiscrete: Int) = + TDigestMLLibVecUDAF(delta, maxDiscrete) + + def tdigestIntArrayUDAF(delta: Double, maxDiscrete: Int) = + TDigestArrayUDAF[Int](delta, maxDiscrete) + + def tdigestLongArrayUDAF(delta: Double, maxDiscrete: Int) = + TDigestArrayUDAF[Long](delta, maxDiscrete) + + def tdigestFloatArrayUDAF(delta: Double, maxDiscrete: Int) = + TDigestArrayUDAF[Float](delta, maxDiscrete) + + def tdigestDoubleArrayUDAF(delta: Double, maxDiscrete: Int) = + TDigestArrayUDAF[Double](delta, maxDiscrete) + + def tdigestReduceUDAF(delta: Double, maxDiscrete: Int) = + TDigestReduceUDAF(delta, maxDiscrete) + + def tdigestArrayReduceUDAF(delta: Double, maxDiscrete: Int) = + TDigestArrayReduceUDAF(delta, maxDiscrete) +} diff --git a/src/main/scala/org/isarnproject/sketches/udaf/package.scala b/src/main/scala/org/isarnproject/sketches/udaf/package.scala index 9f2f536..45c8228 100644 --- a/src/main/scala/org/isarnproject/sketches/udaf/package.scala +++ b/src/main/scala/org/isarnproject/sketches/udaf/package.scala @@ -97,6 +97,66 @@ package object udaf { */ def tdigestMLLibVecUDAF = TDigestMLLibVecUDAF(TDigest.deltaDefault, 0) + /** + * Obtain a UDAF for aggregating (reducing) a column (or grouping) of t-digests + * @return A UDAF that can be applied to a column or grouping of t-digests + * @example + * {{{ + * scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ + * // column of t-digests (might also be result of aggregating from groupBy) + * scala> tds.show() + * +--------------------+ + * | tdigests| + * +--------------------+ + * |TDigestSQL(TDiges...| + * |TDigestSQL(TDiges...| + * |TDigestSQL(TDiges...| + * +--------------------+ + * + * // apply tdigestReduceUDAF to reduce the t-digests to a single combined t-digest + * scala> val td = tds.agg(tdigestReduceUDAF($"tdigests").alias("tdigest")) + * td: org.apache.spark.sql.DataFrame = [tdigest: tdigest] + * + * scala> td.show() + * +--------------------+ + * | tdigest| + * +--------------------+ + * |TDigestSQL(TDiges...| + * +--------------------+ + * }}} + */ + def tdigestReduceUDAF = TDigestReduceUDAF(TDigest.deltaDefault, 0) + + /** + * Obtain a UDAF for aggregating (reducing) a column of t-digest vectors + * @return A UDAF that can be applied to a column of t-digest vectors + * @example + * {{{ + * scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._ + * // column of t-digest arrays (might also be result of aggregating from groupBy) + * scala> tds.show() + * +--------------------+ + * | tdarrays| + * +--------------------+ + * |TDigestArraySQL([...| + * |TDigestArraySQL([...| + * |TDigestArraySQL([...| + * +--------------------+ + * + * // apply tdigestArrayReduceUDAF to reduce the t-digest arrays to single array + * scala> val td = tds.agg(tdigestArrayReduceUDAF($"tdigests").alias("tdarray")) + * td: org.apache.spark.sql.DataFrame = [tdarray: tdigestarray] + * + * scala> td.show() + * +---------------------+ + * | tdarray| + * +---------------------+ + * | TDigestArraySQL([...| + * +---------------------+ + * }}} + */ + def tdigestArrayReduceUDAF = TDigestArrayReduceUDAF(TDigest.deltaDefault, 0) + /** implicitly unpack a TDigestSQL to extract its TDigest payload */ implicit def implicitTDigestSQLToTDigest(tdsql: TDigestSQL): TDigest = tdsql.tdigest