Skip to content

Commit

Permalink
feature importance piplines (#6)
Browse files Browse the repository at this point in the history
* skeleton of params, model and estimator for TDigest feature importance

* draft of TDigestFIEstimator fit()

* model as type parameter, UDAF for FI

* fill in transform

* working draft of importance

* remove Estimator from name

* remove unused field

* remove private val qualifiers

* inheritances

* deviation measure parameter

* private[pipelines]

* feature names

* check length of incoming feature vectors

* document some internal trickery

* Make predModel AnyRef. Sad!

* model becomes parameter

* expose feature importance pipeline to pyspark

* add pipelines pyc to artifact

* scaladoc

* correct method names to setTargetModel / getTargetModel

* README feature importance examples

* fit now uses RDD and aggregate

* replace UDAF for transform with RDD mapPartitions + reduce
  • Loading branch information
erikerlandson authored Sep 23, 2017
1 parent 2d25ffa commit 65447c1
Show file tree
Hide file tree
Showing 6 changed files with 555 additions and 2 deletions.
87 changes: 87 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,90 @@ scala> td.show()
|TDigestArraySQL([...|
+--------------------+
```

### Compute feature importance with respect to a predictive model
```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.ml.regression.LinearRegression

val training = spark.read.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt")

val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)

val lrModel = lr.fit(training)

import org.isarnproject.pipelines.{TDigestFI,TDigestFIModel}

val fi = new TDigestFI().setDelta(0.3).setMaxDiscrete(10)

val fiMod = fi.fit(training)
.setTargetModel(lrModel)
.setDeviationMeasure("rms-dev")
.setFeatureNames(Array.tabulate(10){j=>s"x$j"})

val imp = fiMod.transform(training)

// Exiting paste mode, now interpreting.

import org.apache.spark.ml.regression.LinearRegression
training: org.apache.spark.sql.DataFrame = [label: double, features: vector]
lr: org.apache.spark.ml.regression.LinearRegression = linReg_ad8ebef9cfe8
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = linReg_ad8ebef9cfe8
import org.isarnproject.pipelines.{TDigestFI, TDigestFIModel}
fi: org.isarnproject.pipelines.TDigestFI = TDigestFI_67b1cff93349
fiMod: org.isarnproject.pipelines.TDigestFIModel = TDigestFI_67b1cff93349
imp: org.apache.spark.sql.DataFrame = [name: string, importance: double]

scala> imp.show
+----+-------------------+
|name| importance|
+----+-------------------+
| x0| 0.0|
| x1|0.27093413867331134|
| x2|0.27512986364699304|
| x3| 1.4284480425303374|
| x4|0.04472982597939822|
| x5| 0.5981079647203551|
| x6| 0.0|
| x7|0.11970670592684969|
| x8| 0.1668815037423663|
| x9|0.17970574939101025|
+----+-------------------+
```

### Compute feature importance with respect to a predictive model (python)
```python
>>> from pyspark.ml.regression import LinearRegression
>>> training = spark.read.format("libsvm") \
... .load("data/mllib/sample_linear_regression_data.txt")
>>> lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
>>> lrModel = lr.fit(training)
>>> from isarnproject.pipelines.fi import *
>>> fi = TDigestFI().setDelta(0.3).setMaxDiscrete(10)
>>> fiMod = fi.fit(training) \
... .setTargetModel(lrModel) \
... .setDeviationMeasure("rms-dev") \
... .setFeatureNames(["x%d" % (j) for j in xrange(10)])
>>> imp = fiMod.transform(training)
>>> imp.show()
+----+-------------------+
|name| importance|
+----+-------------------+
| x0| 0.0|
| x1| 0.2513147892886899|
| x2|0.28992477834838837|
| x3| 1.4906022974248356|
| x4|0.04197189119745892|
| x5| 0.6213459845972947|
| x6| 0.0|
| x7|0.12463038543257152|
| x8|0.17144699470039335|
| x9|0.18428188512840307|
+----+-------------------+
```
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ organization := "org.isarnproject"

bintrayOrganization := Some("isarn")

val packageVersion = "0.2.0"
val packageVersion = "0.3.0-SNAPSHOT"

val sparkVersion = "2.2.0"

val pythonVersion = "3.5"
val pythonVersion = "2.7"

val sparkSuffix = s"""sp${sparkVersion.split('.').take(2).mkString(".")}"""

Expand Down Expand Up @@ -124,6 +124,8 @@ compilePython <<= compilePython.dependsOn(deletePYC)

mappings in (Compile, packageBin) ++= Seq(
(baseDirectory.value / "python" / "isarnproject" / "__init__.pyc") -> "isarnproject/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "__init__.pyc") -> "isarnproject/pipelines/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "pipelines" / "fi.pyc") -> "isarnproject/pipelines/fi.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "__init__.pyc") -> "isarnproject/sketches/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "__init__.pyc") -> "isarnproject/sketches/udaf/__init__.pyc",
(baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "tdigest.pyc") -> "isarnproject/sketches/udaf/tdigest.pyc",
Expand Down
Empty file.
126 changes: 126 additions & 0 deletions python/isarnproject/pipelines/fi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from pyspark import since, keyword_only
from pyspark.ml.param.shared import *
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
from pyspark.ml.common import inherit_doc
from pyspark.sql import DataFrame

__all__ = ['TDigestFI', 'TDigestFIModel']

def toPredictionModel(value):
if isinstance(value, JavaPredictionModel):
return value._java_obj
else:
raise TypeError("object %s was not a JavaPredictionModel" % (value))

class TDigestParams(Params):
delta = Param(Params._dummy(), "delta", "tdigest compression parameter",
typeConverter=TypeConverters.toFloat)
maxDiscrete = Param(Params._dummy(), "maxDiscrete", "maximum discrete values",
typeConverter=TypeConverters.toInt)

def __init__(self):
super(TDigestParams, self).__init__()

def setDelta(self, value):
return self._set(delta=value)

def getDelta(self):
return self.getOrDefault(self.delta)

def setMaxDiscrete(self, value):
return self._set(maxDiscrete=value)

def getMaxDiscrete(self):
return self.getOrDefault(self.maxDiscrete)

class TDigestFIParams(TDigestParams, HasFeaturesCol):
def __init__(self):
super(TDigestFIParams, self).__init__()

class TDigestFIModelParams(HasFeaturesCol):
targetModel = Param(Params._dummy(), "targetModel", "predictive model",
typeConverter=toPredictionModel)
nameCol = Param(Params._dummy(), "nameCol", "feature name column",
typeConverter=TypeConverters.toString)
importanceCol = Param(Params._dummy(), "importanceCol", "feature importance column",
typeConverter=TypeConverters.toString)
deviationMeasure = Param(Params._dummy(), "deviationMeasure", "model output deviation measure",
typeConverter=TypeConverters.toString)
featureNames = Param(Params._dummy(), "featureNames", "use these feature names",
typeConverter=TypeConverters.toListString)

def __init__(self):
super(TDigestFIModelParams, self).__init__()

def setTargetModel(self, value):
return self._set(targetModel=value)

def getTargetModel(self):
return self.getOrDefault(self.targetModel)

def setNameCol(self, value):
return self._set(nameCol=value)

def getNameCol(self):
return self.getOrDefault(self.nameCol)

def setImportanceCol(self, value):
return self._set(importanceCol=value)

def getImportanceCol(self):
return self.getOrDefault(self.importanceCol)

def setDeviationMeasure(self, value):
return self._set(deviationMeasure=value)

def getDeviationMeasure(self):
return self.getOrDefault(self.deviationMeasure)

def setFeatureNames(self, value):
return self._set(featureNames=value)

def getFeatureNames(self):
return self.getOrDefault(self.featureNames)

@inherit_doc
class TDigestFI(JavaEstimator, TDigestFIParams, JavaMLWritable, JavaMLReadable):
"""
Feature Importance.
"""

@keyword_only
def __init__(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"):
super(TDigestFI, self).__init__()
self._java_obj = self._new_java_obj("org.isarnproject.pipelines.TDigestFI", self.uid)
self._setDefault(delta = 0.5, maxDiscrete = 0, featuresCol = "features")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
def setParams(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"):
kwargs = self._input_kwargs
return self._set(**kwargs)

def _create_model(self, java_model):
return TDigestFIModel(java_model)

class TDigestFIModel(JavaModel, TDigestFIModelParams, JavaMLWritable, JavaMLReadable):
"""
Model fitted by :class:`TDigestFI`.
"""

def __init__(self, java_model):
# models can't accept Params from __init__
super(TDigestFIModel, self).__init__(java_model)
self._setDefault(deviationMeasure = "auto", featureNames = [],
featuresCol = "features", nameCol = "name", importanceCol = "importance")

@keyword_only
def setParams(self, targetModel = None, deviationMeasure = "auto", featureNames = [],
featuresCol = "features", nameCol = "name", importanceCol = "importance"):
kwargs = self._input_kwargs
# if targetModel wasn't provided then don't try to (re)set the value, it will fail
if kwargs["targetModel"] is None:
del kwargs["targetModel"]
return self._set(**kwargs)
Loading

0 comments on commit 65447c1

Please sign in to comment.