Skip to content

Commit

Permalink
Toward release 0.5.0 - fast TDigest and Spark-3 Aggregator API (#20)
Browse files Browse the repository at this point in the history
* update isarn-sketches dep to 0.2.2

* test design with thin shim class for new fast TDigest to clean up the API

* update initial commands

* scope t digest shim class, TDigestAggregator companion obj

* bump isarn-sketches to 0.3.0

* example of a java/python binding

* modify python tdigest UDT and a test UDF

* ScalarNumeric, data type functions, python renaming, commenting out old code

* spark 3.0 supports scala 2.12 only

* http -> https

* TDigestArrayAggregator

* array function overloadings

* add instructions for cleaning out ivy on local publish

* spark vector aggregations

* no longer need UDT for tdigest array

* old tdigest UDTs are obsolete

* remove package object

* sketches.spark.tdigest._

* tdigest.scala

* TDigestReduceAggregator

* TDigestArrayReduceAggregator

* TDigestUDAF.scala is obsolete

* TDigestArrayReduceAggregator inherit from TDigestArrayAggregatorBase

* factor out compression and maxdiscrete from TDigestArrayAggregatorBase

* /udaf/ -> /spark/

* /udaf/ -> /spark/

* move python TDigestUDT into spark/tdigest.py

* update sbt build mappings for python refactor

* update readme python for new organization

* copyright

* unused imports

* more unused imports

* switch to fast java TDigest

* explicit import of JavaPredictionModel

* /pipelines/ -> /pipelines/spark/

* python/isarnproject/pipelines/__init__.py

* update build mappings for new python organization

* update package paths for new organization

* fix package object path

* update copyright

* update pyspark tdigest to be cleaner and analogous to java implementation

* spark pipeline param delta -> compression

* fi.scala

* update assembly dep and move it into plugins

* add scaladoc

* move ScalarNumeric out of tdigest specific package

* update README examples for scala

* spark.sparkContext

* update python tdigest examples

* update feature importance examples

* isarn-sketches-java

* utest harness for spark testing

* TDigestAggregator test

* TDigestAggregator test

* KSD cumulative distribution divergence measure for unit testing

* test counts

* BigDecimal range

* local build against spark-3.0.1-SNAPSHOT

* test TDigestArrayAggregator

* tests for spark ML vector types

* cache test data sets

* test tdigest reducing aggregators

* epsD

* move approx to base class

* disable parallel test execution to prevent spark cluster teardown race conditions

* feature importance unit test

* build against spark 3.0.1

* xsbt -> sbt

* 0.5.0
  • Loading branch information
erikerlandson authored Sep 15, 2020
1 parent a8ec2b4 commit e7d3136
Show file tree
Hide file tree
Showing 20 changed files with 1,642 additions and 1,573 deletions.
514 changes: 211 additions & 303 deletions README.md

Large diffs are not rendered by default.

47 changes: 28 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
// xsbt clean unidoc previewSite
// xsbt clean unidoc ghpagesPushSite
// xsbt +publish
// sbt clean unidoc previewSite
// sbt clean unidoc ghpagesPushSite
// sbt +publish
// https://oss.sonatype.org
// make sure sparkVersion is set as you want prior to +publish
// when doing localPublish, also do:
// rm -rf /home/eje/.ivy2/local/org.isarnproject /home/eje/.ivy2/cache/org.isarnproject

import scala.sys.process._

name := "isarn-sketches-spark"

organization := "org.isarnproject"

val packageVersion = "0.4.1-SNAPSHOT"
val packageVersion = "0.5.0"

val sparkVersion = "3.0.0"
val sparkVersion = "3.0.1"

val sparkSuffix = s"""sp${sparkVersion.split('.').take(2).mkString(".")}"""

version := s"${packageVersion}-${sparkSuffix}"

scalaVersion := "2.12.11"

crossScalaVersions := Seq("2.12.11") // scala 2.12.11 when spark supports it
crossScalaVersions := Seq("2.12.11")

pomIncludeRepository := { _ => false }

Expand Down Expand Up @@ -54,14 +56,22 @@ developers := List(
)
)

resolvers += Resolver.mavenLocal

libraryDependencies ++= Seq(
"org.isarnproject" %% "isarn-sketches" % "0.1.2",
"org.isarnproject" % "isarn-sketches-java" % "0.3.0",
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-mllib" % sparkVersion % Provided,
"org.isarnproject" %% "isarn-scalatest" % "0.0.3" % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
"org.apache.commons" % "commons-math3" % "3.6.1" % Test)
"com.lihaoyi" %% "utest" % "0.7.4" % Test)

// tell sbt about utest
testFrameworks += new TestFramework("utest.runner.Framework")

// default is to run tests in parallel, asynchronously, but
// that breaks both spark-cluster setup and teardown, and also breaks
// repeatability of the random data generation
parallelExecution in Test := false

initialCommands in console := """
|import org.apache.spark.SparkConf
Expand All @@ -70,10 +80,10 @@ initialCommands in console := """
|import org.apache.spark.SparkContext._
|import org.apache.spark.rdd.RDD
|import org.apache.spark.ml.linalg.Vectors
|import org.isarnproject.sketches.TDigest
|import org.isarnproject.sketches.udaf._
|import org.apache.spark.isarnproject.sketches.udt._
|val initialConf = new SparkConf().setAppName("repl").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer", "16mb")
|import org.apache.spark.sql.functions._
|import org.isarnproject.sketches.java.TDigest
|import org.isarnproject.sketches.spark._
|val initialConf = new SparkConf().setAppName("repl")
|val spark = SparkSession.builder.config(initialConf).master("local[2]").getOrCreate()
|import spark._, spark.implicits._
|val sc = spark.sparkContext
Expand All @@ -90,12 +100,11 @@ scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
mappings in (Compile, packageBin) ++= Seq(
(baseDirectory.value / "python" / "isarnproject" / "__init__.py") -> "isarnproject/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "__init__.py") -> "isarnproject/pipelines/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "fi.py") -> "isarnproject/pipelines/fi.py",
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "spark" / "__init__.py") -> "isarnproject/pipelines/spark/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "spark" / "fi.py") -> "isarnproject/pipelines/spark/fi.py",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "__init__.py") -> "isarnproject/sketches/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "__init__.py") -> "isarnproject/sketches/udaf/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "tdigest.py") -> "isarnproject/sketches/udaf/tdigest.py",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "__init__.py") -> "isarnproject/sketches/udt/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udt" / "tdigest.py") -> "isarnproject/sketches/udt/tdigest.py"
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "spark" / "__init__.py") -> "isarnproject/sketches/spark/__init__.py",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "spark" / "tdigest.py") -> "isarnproject/sketches/spark/tdigest.py",
)

test in assembly := {}
Expand Down
1 change: 0 additions & 1 deletion project/assembly.sbt

This file was deleted.

6 changes: 4 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
resolvers += Resolver.url(
"bintray-sbt-plugin-releases",
url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
url("https://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
Resolver.ivyStylePatterns)

resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"

resolvers += "jgit-repo" at "http://download.eclipse.org/jgit/maven"
resolvers += "jgit-repo" at "https://download.eclipse.org/jgit/maven"

addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.6.3")

Expand All @@ -15,6 +15,8 @@ addSbtPlugin("io.crashbox" % "sbt-gpg" % "0.2.1")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.2")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")

// scoverage and coveralls deps are at old versions to avoid a bug in the current versions
// update these when this fix is released: https://github.com/scoverage/sbt-coveralls/issues/73
//addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.0.4")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pyspark import since, keyword_only
from pyspark.ml.param.shared import *
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper, JavaPredictionModel
from pyspark.ml.common import inherit_doc
from pyspark.sql import DataFrame

Expand All @@ -14,19 +14,19 @@ def toPredictionModel(value):
raise TypeError("object %s was not a JavaPredictionModel" % (value))

class TDigestParams(Params):
delta = Param(Params._dummy(), "delta", "tdigest compression parameter",
compression = Param(Params._dummy(), "compression", "tdigest compression parameter",
typeConverter=TypeConverters.toFloat)
maxDiscrete = Param(Params._dummy(), "maxDiscrete", "maximum discrete values",
typeConverter=TypeConverters.toInt)

def __init__(self):
super(TDigestParams, self).__init__()

def setDelta(self, value):
return self._set(delta=value)
def setCompression(self, value):
return self._set(compression=value)

def getDelta(self):
return self.getOrDefault(self.delta)
def getCompression(self):
return self.getOrDefault(self.compression)

def setMaxDiscrete(self, value):
return self._set(maxDiscrete=value)
Expand Down Expand Up @@ -90,15 +90,15 @@ class TDigestFI(JavaEstimator, TDigestFIParams, JavaMLWritable, JavaMLReadable):
"""

@keyword_only
def __init__(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"):
def __init__(self, compression = 0.5, maxDiscrete = 0, featuresCol = "features"):
super(TDigestFI, self).__init__()
self._java_obj = self._new_java_obj("org.isarnproject.pipelines.TDigestFI", self.uid)
self._setDefault(delta = 0.5, maxDiscrete = 0, featuresCol = "features")
self._java_obj = self._new_java_obj("org.isarnproject.pipelines.spark.fi.TDigestFI", self.uid)
self._setDefault(compression = 0.5, maxDiscrete = 0, featuresCol = "features")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"):
def setParams(self, compression = 0.5, maxDiscrete = 0, featuresCol = "features"):
kwargs = self._input_kwargs
return self._set(**kwargs)

Expand Down
Loading

0 comments on commit e7d3136

Please sign in to comment.