Skip to content

Commit

Permalink
pyspark support (#5)
Browse files Browse the repository at this point in the history
* assembly

* test some objects consumable from pyspark

* experimenting with pyc files in jar

* a working prototype of scala & python in same maven jar file

* draft of python-side TDigestUDT and TDigest

* cdf method for python TDigest

* cdfInverse, also some doc

* comment

* cdfDiscrete, cdfDiscreteInverse, and sampling methods

* Restore pyc in jars, clean up python bindings for UDAFs

* fill in some missing method overrides for TDigestArrayUDT

* fix typo tdigestFloatUDAF

* TDigestArrayUDT for python

* T-Digest reducing UDAFs

* add some doc

* document reducing variants

* bump version, add sonatype publishing, py/spark version suffixes

* python examples for readme
  • Loading branch information
erikerlandson authored Sep 2, 2017
1 parent 00a7c3f commit 16af0ef
Show file tree
Hide file tree
Showing 13 changed files with 1,020 additions and 6 deletions.
205 changes: 205 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,208 @@ sampleV: Array[Double] = Array(0.10298190759496548, -0.1968752746464183, -1.0139
scala> val medianV = firstV.getAs[TDigestArraySQL](0).tdigests.map(_.cdfInverse(0.5))
medianV: Array[Double] = Array(0.025820266848484798, 0.01951778217339037, 0.09701138847692858)
```

### Reduce a column (or grouping) of T-Digests
```scala
scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._, org.isarnproject.sketches._, scala.util.Random._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches._
import scala.util.Random._

scala> val x = sc.parallelize(Vector.fill(1000) { nextGaussian }).toDF("x")
x: org.apache.spark.sql.DataFrame = [x: double]

scala> val g = sc.parallelize(Seq(1,2,3,4,5)).toDF("g")
g: org.apache.spark.sql.DataFrame = [g: int]

scala> val data = g.crossJoin(x)
data: org.apache.spark.sql.DataFrame = [g: int, x: double]

scala> val udaf = tdigestUDAF[Double]
udaf: org.isarnproject.sketches.udaf.TDigestUDAF[Double] = TDigestUDAF(0.5,0)

scala> val tds = data.groupBy("g").agg(udaf($"x").alias("tdigests"))
tds: org.apache.spark.sql.DataFrame = [g: int, tdigests: tdigest]

scala> tds.show()
+---+--------------------+
| g| tdigests|
+---+--------------------+
| 1|TDigestSQL(TDiges...|
| 3|TDigestSQL(TDiges...|
| 5|TDigestSQL(TDiges...|
| 4|TDigestSQL(TDiges...|
| 2|TDigestSQL(TDiges...|
+---+--------------------+

scala> val td = tds.agg(tdigestReduceUDAF($"tdigests").alias("tdigest"))
td: org.apache.spark.sql.DataFrame = [tdigest: tdigest]

scala> td.show()
+--------------------+
| tdigest|
+--------------------+
|TDigestSQL(TDiges...|
+--------------------+
```

### Reduce a column (or grouping) of T-Digest Arrays
```scala
scala> import org.isarnproject.sketches.udaf._, org.apache.spark.isarnproject.sketches.udt._, org.isarnproject.sketches._, scala.util.Random._
import org.isarnproject.sketches.udaf._
import org.apache.spark.isarnproject.sketches.udt._
import org.isarnproject.sketches._
import scala.util.Random._

scala> val x = sc.parallelize(Vector.fill(1000) { Vector.fill(3) { nextGaussian } }).toDF("x")
x: org.apache.spark.sql.DataFrame = [x: array<double>]

scala> val g = sc.parallelize(Seq(1,2,3,4,5)).toDF("g")
g: org.apache.spark.sql.DataFrame = [g: int]

scala> val data = g.crossJoin(x)
data: org.apache.spark.sql.DataFrame = [g: int, x: array<double>]

scala> val udaf = tdigestArrayUDAF[Double]
udaf: org.isarnproject.sketches.udaf.TDigestArrayUDAF[Double] = TDigestArrayUDAF(0.5,0)

scala> val tds = data.groupBy("g").agg(udaf($"x").alias("tdigests"))
tds: org.apache.spark.sql.DataFrame = [g: int, tdigests: tdigestarray]

scala> tds.show()
+---+--------------------+
| g| tdigests|
+---+--------------------+
| 1|TDigestArraySQL([...|
| 3|TDigestArraySQL([...|
| 5|TDigestArraySQL([...|
| 4|TDigestArraySQL([...|
| 2|TDigestArraySQL([...|
+---+--------------------+

scala> val td = tds.agg(tdigestArrayReduceUDAF($"tdigests"))
td: org.apache.spark.sql.DataFrame = [tdigestarrayreduceudaf(tdigests): tdigestarray]

scala> td.show()
+--------------------------------+
|tdigestarrayreduceudaf(tdigests)|
+--------------------------------+
| TDigestArraySQL([...|
+--------------------------------+
```

### Sketch a numeric column (python)
```python
>>> from isarnproject.sketches.udaf.tdigest import *
>>> from random import gauss
>>> from pyspark.sql.types import *
>>> data = sc.parallelize([[gauss(0,1)] for x in xrange(1000)]).toDF(StructType([StructField("x", DoubleType())]))
>>> agg = data.agg(tdigestDoubleUDAF("x"))
>>> td = agg.first()[0]
>>> td.cdfInverse(0.5)
0.046805581998797419
>>>
```

### Sketch a numeric array column (python)
```python
>>> from isarnproject.sketches.udaf.tdigest import *
>>> from random import gauss
>>> from pyspark.sql.types import *
>>> data = sc.parallelize([[[gauss(0,1),gauss(0,1),gauss(0,1)]] for x in xrange(1000)]).toDF(StructType([StructField("x", ArrayType(DoubleType()))]))
>>> agg = data.agg(tdigestDoubleArrayUDAF("x"))
>>> tds = agg.first()[0]
>>> [t.cdfInverse(0.5) for t in td]
[0.046116924117141189, -0.011071666930287466, -0.019006033872431105]
>>>
```

### Sketch a column of ML Vectors (python)
```python
>>> from isarnproject.sketches.udaf.tdigest import *
>>> from random import gauss
>>> from pyspark.ml.linalg import VectorUDT, Vectors
>>> from pyspark.sql.types import *
>>> data = sc.parallelize([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in xrange(1000)]).toDF(StructType([StructField("x", VectorUDT())]))
>>> agg = data.agg(tdigestMLVecUDAF("x"))
>>> tds = agg.first()[0]
>>> [t.cdfInverse(0.5) for t in tds]
[0.02859498787770634, -0.0027338622700039117, 0.041590980872883487]
>>>
```

### Sketch a column of MLLib Vectors (python)
```python
>>> from isarnproject.sketches.udaf.tdigest import *
>>> from random import gauss
>>> from pyspark.mllib.linalg import VectorUDT, Vectors
>>> from pyspark.sql.types import *
>>> data = sc.parallelize([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in xrange(1000)]).toDF(StructType([StructField("x", VectorUDT())]))
>>> agg = data.agg(tdigestMLLibVecUDAF("x"))
>>> tds = agg.first()[0]
>>> [t.cdfInverse(0.5) for t in tds]
[0.02859498787770634, -0.0027338622700039117, 0.041590980872883487]
>>>
```

### Reduce a column (or grouping) of T-Digests (python)
```python
>>> from isarnproject.sketches.udaf.tdigest import *
>>> from random import gauss
>>> from pyspark.sql.types import *
>>> x = sc.parallelize([[gauss(0,1)] for x in xrange(1000)]).toDF(StructType([StructField("x", DoubleType())]))
>>> g = sc.parallelize([[1+x] for x in xrange(5)]).toDF(StructType([StructField("g", IntegerType())]))
>>> data = g.crossJoin(x)
>>> tds = data.groupBy("g").agg(tdigestDoubleUDAF("x").alias("tdigests"))
>>> tds.show()
+---+--------------------+
| g| tdigests|
+---+--------------------+
| 1|TDigestSQL(TDiges...|
| 3|TDigestSQL(TDiges...|
| 5|TDigestSQL(TDiges...|
| 4|TDigestSQL(TDiges...|
| 2|TDigestSQL(TDiges...|
+---+--------------------+

>>> td = tds.agg(tdigestReduceUDAF("tdigests").alias("tdigest"))
>>> td.show()
+--------------------+
| tdigest|
+--------------------+
|TDigestSQL(TDiges...|
+--------------------+

>>>
```

### Reduce a column (or grouping) of T-Digest Arrays (python)
```python
>>> from isarnproject.sketches.udaf.tdigest import *
>>> from random import gauss
>>> from pyspark.ml.linalg import VectorUDT, Vectors
>>> from pyspark.sql.types import *
>>> x = sc.parallelize([[Vectors.dense([gauss(0,1),gauss(0,1),gauss(0,1)])] for x in xrange(1000)]).toDF(StructType([StructField("x", VectorUDT())]))
>>> g = sc.parallelize([[1+x] for x in xrange(5)]).toDF(StructType([StructField("g", IntegerType())]))
>>> data = g.crossJoin(x)
>>> tds = data.groupBy("g").agg(tdigestMLVecUDAF("x").alias("tdigests"))
>>> tds.show()
+---+--------------------+
| g| tdigests|
+---+--------------------+
| 1|TDigestArraySQL([...|
| 3|TDigestArraySQL([...|
| 5|TDigestArraySQL([...|
| 4|TDigestArraySQL([...|
| 2|TDigestArraySQL([...|
+---+--------------------+

>>> td = tds.agg(tdigestArrayReduceUDAF("tdigests").alias("tdigest"))
>>> td.show()
+--------------------+
| tdigest|
+--------------------+
|TDigestArraySQL([...|
+--------------------+
```
105 changes: 99 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,65 @@ organization := "org.isarnproject"

bintrayOrganization := Some("isarn")

version := "0.1.0"
val packageVersion = "0.2.0"

val sparkVersion = "2.2.0"

val pythonVersion = "2.7"

val sparkSuffix = s"""sp${sparkVersion.split('.').take(2).mkString(".")}"""

val pythonSuffix = s"""py${pythonVersion.split('.').take(2).mkString(".")}"""

val pythonCMD = s"""python${pythonVersion.split('.').head}"""

version := s"${packageVersion}-${sparkSuffix}-${pythonSuffix}"

scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.10.6", "2.11.8")

useGpg := true

pomIncludeRepository := { _ => false }

publishMavenStyle := true

publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}

licenses += ("Apache-2.0", url("http://opensource.org/licenses/Apache-2.0"))

homepage := Some(url("https://github.com/isarn/isarn-sketches-spark"))

scmInfo := Some(
ScmInfo(
url("https://github.com/isarn/isarn-sketches-spark"),
"scm:[email protected]:isarn/isarn-sketches-spark.git"
)
)

developers := List(
Developer(
id = "erikerlandson",
name = "Erik Erlandson",
email = "[email protected]",
url = url("https://erikerlandson.github.io/")
)
)

def commonSettings = Seq(
libraryDependencies ++= Seq(
"org.isarnproject" %% "isarn-sketches" % "0.1.0",
"org.apache.spark" %% "spark-core" % "2.1.0" % Provided,
"org.apache.spark" %% "spark-sql" % "2.1.0" % Provided,
"org.apache.spark" %% "spark-mllib" % "2.1.0" % Provided,
"org.isarnproject" %% "isarn-scalatest" % "0.0.1" % Test,
"org.isarnproject" %% "isarn-sketches" % "0.1.1",
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-mllib" % sparkVersion % Provided,
"org.isarnproject" %% "isarn-scalatest" % "0.0.2" % Test,
"org.scalatest" %% "scalatest" % "2.2.4" % Test,
"org.apache.commons" % "commons-math3" % "3.6.1" % Test),
initialCommands in console := """
Expand Down Expand Up @@ -45,6 +91,53 @@ licenses += ("Apache-2.0", url("http://opensource.org/licenses/Apache-2.0"))

scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")

lazy val deletePYC = taskKey[Unit]("Delete .pyc files")

deletePYC := {
val s: TaskStreams = streams.value
s.log.info("delete .pyc files...")
val cmd = "bash" :: "-c" :: "rm -f $(find python -name *.pyc)" :: Nil
val stat = (cmd !)
if (stat == 0) {
s.log.info("delete .pyc succeeded")
} else {
throw new IllegalStateException("delete .pyc failed")
}
}

lazy val compilePython = taskKey[Unit]("Compile python files")

compilePython := {
val s: TaskStreams = streams.value
s.log.info("compiling python...")
val stat = (Seq(pythonCMD, "-m", "compileall", "python/") !)
if (stat == 0) {
s.log.info("python compile succeeded")
} else {
throw new IllegalStateException("python compile failed")
}
}

compilePython <<= compilePython.dependsOn(deletePYC)

(packageBin in Compile) <<= (packageBin in Compile).dependsOn(compilePython)

mappings in (Compile, packageBin) ++= Seq(
(baseDirectory.value / "python" / "isarnproject" / "__init__.pyc") -> "isarnproject/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "__init__.pyc") -> "isarnproject/sketches/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "__init__.pyc") -> "isarnproject/sketches/udaf/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "tdigest.pyc") -> "isarnproject/sketches/udaf/tdigest.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "__init__.pyc") -> "isarnproject/sketches/udt/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "tdigest.pyc") -> "isarnproject/sketches/udt/tdigest.pyc"
)

test in assembly := {}

assemblyShadeRules in assembly := Seq(
ShadeRule.zap("scala.**").inAll,
ShadeRule.zap("org.slf4j.**").inAll
)

scalacOptions in (Compile, doc) ++= Seq("-doc-root-content", baseDirectory.value+"/root-doc.txt")

site.settings
Expand Down
1 change: 1 addition & 0 deletions project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
4 changes: 4 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4")

addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.6.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.1")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.0")

// scoverage and coveralls deps are at old versions to avoid a bug in the current versions
// update these when this fix is released: https://github.com/scoverage/sbt-coveralls/issues/73
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4")
Expand Down
Empty file added python/isarnproject/__init__.py
Empty file.
Empty file.
Empty file.
Loading

0 comments on commit 16af0ef

Please sign in to comment.