diff --git a/README.md b/README.md index 61af552..ac8e7d0 100644 --- a/README.md +++ b/README.md @@ -337,3 +337,90 @@ scala> td.show() |TDigestArraySQL([...| +--------------------+ ``` + +### Compute feature importance with respect to a predictive model +```scala +scala> :paste +// Entering paste mode (ctrl-D to finish) + +import org.apache.spark.ml.regression.LinearRegression + +val training = spark.read.format("libsvm") + .load("data/mllib/sample_linear_regression_data.txt") + +val lr = new LinearRegression() + .setMaxIter(10) + .setRegParam(0.3) + .setElasticNetParam(0.8) + +val lrModel = lr.fit(training) + +import org.isarnproject.pipelines.{TDigestFI,TDigestFIModel} + +val fi = new TDigestFI().setDelta(0.3).setMaxDiscrete(10) + +val fiMod = fi.fit(training) + .setTargetModel(lrModel) + .setDeviationMeasure("rms-dev") + .setFeatureNames(Array.tabulate(10){j=>s"x$j"}) + +val imp = fiMod.transform(training) + +// Exiting paste mode, now interpreting. + +import org.apache.spark.ml.regression.LinearRegression +training: org.apache.spark.sql.DataFrame = [label: double, features: vector] +lr: org.apache.spark.ml.regression.LinearRegression = linReg_ad8ebef9cfe8 +lrModel: org.apache.spark.ml.regression.LinearRegressionModel = linReg_ad8ebef9cfe8 +import org.isarnproject.pipelines.{TDigestFI, TDigestFIModel} +fi: org.isarnproject.pipelines.TDigestFI = TDigestFI_67b1cff93349 +fiMod: org.isarnproject.pipelines.TDigestFIModel = TDigestFI_67b1cff93349 +imp: org.apache.spark.sql.DataFrame = [name: string, importance: double] + +scala> imp.show ++----+-------------------+ +|name| importance| ++----+-------------------+ +| x0| 0.0| +| x1|0.27093413867331134| +| x2|0.27512986364699304| +| x3| 1.4284480425303374| +| x4|0.04472982597939822| +| x5| 0.5981079647203551| +| x6| 0.0| +| x7|0.11970670592684969| +| x8| 0.1668815037423663| +| x9|0.17970574939101025| ++----+-------------------+ +``` + +### Compute feature importance with respect to a predictive model (python) +```python +>>> from pyspark.ml.regression import LinearRegression +>>> training = spark.read.format("libsvm") \ +... .load("data/mllib/sample_linear_regression_data.txt") +>>> lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) +>>> lrModel = lr.fit(training) +>>> from isarnproject.pipelines.fi import * +>>> fi = TDigestFI().setDelta(0.3).setMaxDiscrete(10) +>>> fiMod = fi.fit(training) \ +... .setTargetModel(lrModel) \ +... .setDeviationMeasure("rms-dev") \ +... .setFeatureNames(["x%d" % (j) for j in xrange(10)]) +>>> imp = fiMod.transform(training) +>>> imp.show() ++----+-------------------+ +|name| importance| ++----+-------------------+ +| x0| 0.0| +| x1| 0.2513147892886899| +| x2|0.28992477834838837| +| x3| 1.4906022974248356| +| x4|0.04197189119745892| +| x5| 0.6213459845972947| +| x6| 0.0| +| x7|0.12463038543257152| +| x8|0.17144699470039335| +| x9|0.18428188512840307| ++----+-------------------+ +``` diff --git a/build.sbt b/build.sbt index 1ba0a87..80194fd 100644 --- a/build.sbt +++ b/build.sbt @@ -4,11 +4,11 @@ organization := "org.isarnproject" bintrayOrganization := Some("isarn") -val packageVersion = "0.2.0" +val packageVersion = "0.3.0-SNAPSHOT" val sparkVersion = "2.2.0" -val pythonVersion = "3.5" +val pythonVersion = "2.7" val sparkSuffix = s"""sp${sparkVersion.split('.').take(2).mkString(".")}""" @@ -124,6 +124,8 @@ compilePython <<= compilePython.dependsOn(deletePYC) mappings in (Compile, packageBin) ++= Seq( (baseDirectory.value / "python" / "isarnproject" / "__init__.pyc") -> "isarnproject/__init__.pyc", + (baseDirectory.value / "python" / "isarnproject" / "pipelines" / "__init__.pyc") -> "isarnproject/pipelines/__init__.pyc", + (baseDirectory.value / "python" / "isarnproject" / "pipelines" / "fi.pyc") -> "isarnproject/pipelines/fi.pyc", (baseDirectory.value / "python" / "isarnproject" / "sketches" / "__init__.pyc") -> "isarnproject/sketches/__init__.pyc", (baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "__init__.pyc") -> "isarnproject/sketches/udaf/__init__.pyc", (baseDirectory.value / "python" / "isarnproject" / "sketches" / "udaf" / "tdigest.pyc") -> "isarnproject/sketches/udaf/tdigest.pyc", diff --git a/python/isarnproject/pipelines/__init__.py b/python/isarnproject/pipelines/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python/isarnproject/pipelines/fi.py b/python/isarnproject/pipelines/fi.py new file mode 100644 index 0000000..628d847 --- /dev/null +++ b/python/isarnproject/pipelines/fi.py @@ -0,0 +1,126 @@ +from pyspark import since, keyword_only +from pyspark.ml.param.shared import * +from pyspark.ml.util import * +from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper +from pyspark.ml.common import inherit_doc +from pyspark.sql import DataFrame + +__all__ = ['TDigestFI', 'TDigestFIModel'] + +def toPredictionModel(value): + if isinstance(value, JavaPredictionModel): + return value._java_obj + else: + raise TypeError("object %s was not a JavaPredictionModel" % (value)) + +class TDigestParams(Params): + delta = Param(Params._dummy(), "delta", "tdigest compression parameter", + typeConverter=TypeConverters.toFloat) + maxDiscrete = Param(Params._dummy(), "maxDiscrete", "maximum discrete values", + typeConverter=TypeConverters.toInt) + + def __init__(self): + super(TDigestParams, self).__init__() + + def setDelta(self, value): + return self._set(delta=value) + + def getDelta(self): + return self.getOrDefault(self.delta) + + def setMaxDiscrete(self, value): + return self._set(maxDiscrete=value) + + def getMaxDiscrete(self): + return self.getOrDefault(self.maxDiscrete) + +class TDigestFIParams(TDigestParams, HasFeaturesCol): + def __init__(self): + super(TDigestFIParams, self).__init__() + +class TDigestFIModelParams(HasFeaturesCol): + targetModel = Param(Params._dummy(), "targetModel", "predictive model", + typeConverter=toPredictionModel) + nameCol = Param(Params._dummy(), "nameCol", "feature name column", + typeConverter=TypeConverters.toString) + importanceCol = Param(Params._dummy(), "importanceCol", "feature importance column", + typeConverter=TypeConverters.toString) + deviationMeasure = Param(Params._dummy(), "deviationMeasure", "model output deviation measure", + typeConverter=TypeConverters.toString) + featureNames = Param(Params._dummy(), "featureNames", "use these feature names", + typeConverter=TypeConverters.toListString) + + def __init__(self): + super(TDigestFIModelParams, self).__init__() + + def setTargetModel(self, value): + return self._set(targetModel=value) + + def getTargetModel(self): + return self.getOrDefault(self.targetModel) + + def setNameCol(self, value): + return self._set(nameCol=value) + + def getNameCol(self): + return self.getOrDefault(self.nameCol) + + def setImportanceCol(self, value): + return self._set(importanceCol=value) + + def getImportanceCol(self): + return self.getOrDefault(self.importanceCol) + + def setDeviationMeasure(self, value): + return self._set(deviationMeasure=value) + + def getDeviationMeasure(self): + return self.getOrDefault(self.deviationMeasure) + + def setFeatureNames(self, value): + return self._set(featureNames=value) + + def getFeatureNames(self): + return self.getOrDefault(self.featureNames) + +@inherit_doc +class TDigestFI(JavaEstimator, TDigestFIParams, JavaMLWritable, JavaMLReadable): + """ + Feature Importance. + """ + + @keyword_only + def __init__(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"): + super(TDigestFI, self).__init__() + self._java_obj = self._new_java_obj("org.isarnproject.pipelines.TDigestFI", self.uid) + self._setDefault(delta = 0.5, maxDiscrete = 0, featuresCol = "features") + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, delta = 0.5, maxDiscrete = 0, featuresCol = "features"): + kwargs = self._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return TDigestFIModel(java_model) + +class TDigestFIModel(JavaModel, TDigestFIModelParams, JavaMLWritable, JavaMLReadable): + """ + Model fitted by :class:`TDigestFI`. + """ + + def __init__(self, java_model): + # models can't accept Params from __init__ + super(TDigestFIModel, self).__init__(java_model) + self._setDefault(deviationMeasure = "auto", featureNames = [], + featuresCol = "features", nameCol = "name", importanceCol = "importance") + + @keyword_only + def setParams(self, targetModel = None, deviationMeasure = "auto", featureNames = [], + featuresCol = "features", nameCol = "name", importanceCol = "importance"): + kwargs = self._input_kwargs + # if targetModel wasn't provided then don't try to (re)set the value, it will fail + if kwargs["targetModel"] is None: + del kwargs["targetModel"] + return self._set(**kwargs) diff --git a/src/main/scala/org/isarnproject/pipelines/TDigestFI.scala b/src/main/scala/org/isarnproject/pipelines/TDigestFI.scala new file mode 100644 index 0000000..72e44de --- /dev/null +++ b/src/main/scala/org/isarnproject/pipelines/TDigestFI.scala @@ -0,0 +1,309 @@ +/* +Copyright 2017 Erik Erlandson +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.isarnproject.pipelines + +import scala.reflect.ClassTag +import scala.collection.mutable.WrappedArray + +import org.apache.spark.ml.{Estimator, Model, PredictionModel} +import org.apache.spark.ml.classification.ClassificationModel +import org.apache.spark.ml.regression.RegressionModel +import org.apache.spark.ml.param._ +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, DataFrame} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{Vector => MLVector, + DenseVector => MLDense, SparseVector => MLSparse} +import org.apache.spark.sql.expressions.MutableAggregationBuffer +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction +import org.apache.spark.sql.Row + +import org.isarnproject.sketches.TDigest +import org.apache.spark.isarnproject.sketches.udt._ +import org.isarnproject.sketches.udaf._ + +// Defining these in a subpackage so the package can have other +// param definitions added to it elsewhere. I'm keeping them visible +// so other packages can use them in the future if there is a use +package params { + trait HasFeaturesCol extends Params with DefaultParamsWritable { + + /** + * Column containing feature vectors. Expected type is ML Vector. + * Defaults to "features" + * @group param + */ + final val featuresCol: Param[String] = + new Param[String](this, "featuresCol", "feature column name") + setDefault(featuresCol, "features") + final def getFeaturesCol: String = $(featuresCol) + final def setFeaturesCol(value: String): this.type = set(featuresCol, value) + } + + trait TDigestParams extends Params with DefaultParamsWritable { + /** + * TDigest compression parameter. + * Defaults to 0.5 + * @group param + */ + final val delta: DoubleParam = + new DoubleParam(this, "delta", "t-digest compression (> 0)", ParamValidators.gt(0.0)) + setDefault(delta, org.isarnproject.sketches.TDigest.deltaDefault) + final def getDelta: Double = $(delta) + final def setDelta(value: Double): this.type = set(delta, value) + + /** + * Maximum number of discrete values to sketch before transitioning to continuous mode + * Defaults to 0 + * @group param + */ + final val maxDiscrete: IntParam = + new IntParam(this, "maxDiscrete", "maximum unique discrete values (>= 0)", + ParamValidators.gtEq(0)) + setDefault(maxDiscrete, 0) + final def getMaxDiscrete: Int = $(maxDiscrete) + final def setMaxDiscrete(value: Int): this.type = set(maxDiscrete, value) + } + + trait TDigestFIParams extends Params with TDigestParams with HasFeaturesCol + + trait TDigestFIModelParams extends Params + with HasFeaturesCol with DefaultParamsWritable { + + /** + * A predictive model to compute variable importances against. + * No default. + * @group param + */ + final val targetModel: Param[AnyRef] = + new Param[AnyRef](this, "targetModel", "predictive model") + // no default for this + final def getTargetModel: AnyRef = $(targetModel) + final def setTargetModel(value: AnyRef): this.type = { + if (!inheritances(value).contains("PredictionModel")) { + throw new Exception("model must be a subclass of PredictionModel") + } + set(targetModel, value) + } + + /** + * Column name to use for feature names. + * Defaults to "name" + * @group param + */ + final val nameCol: Param[String] = + new Param[String](this, "nameCol", "column for names of features") + setDefault(nameCol, "name") + final def getNameCol: String = $(nameCol) + final def setNameCol(value: String): this.type = set(nameCol, value) + + /** + * Column name to use for feature importances. + * Defaults to "importance" + * @group param + */ + final val importanceCol: Param[String] = + new Param[String](this, "importanceCol", "column for feature importance values") + setDefault(importanceCol, "importance") + final def getImportanceCol: String = $(importanceCol) + final def setImportanceCol(value: String): this.type = set(importanceCol, value) + + /** + * Function to measure the change resulting from randomizing a feature value. + * Defaults to "auto" (detects whether model is regression or classification). + * Options: "auto", "dev-rate" (class), "abs-dev" (reg), "rms-dev" (reg) + * @group param + */ + final val deviationMeasure: Param[String] = + new Param[String](this, "deviationMeasure", "deviation measure to apply") + setDefault(deviationMeasure, "auto") + final def getDeviationMeasure: String = $(deviationMeasure) + final def setDeviationMeasure(value: String): this.type = set(deviationMeasure, value) + + /** + * Names to use for features. + * Defaults to f1, f2, ... + * @group param + */ + final val featureNames: StringArrayParam = + new StringArrayParam(this, "featureNames", "assume these feature names") + setDefault(featureNames, Array.empty[String]) + final def getFeatureNames: Array[String] = $(featureNames) + final def setFeatureNames(value: Array[String]): this.type = set(featureNames, value) + } +} + +import params._ + +/** + * Model/Transformer for transforming input feature data into a DataFrame containing + * "name" and "importance" columns, mapping feature name to its computed importance. + */ +class TDigestFIModel( + override val uid: String, + featTD: Array[TDigest], + spark: SparkSession + ) extends Model[TDigestFIModel] with TDigestFIModelParams { + + private val featTDBC = spark.sparkContext.broadcast(featTD) + + private def deviation: (Double, Double) => Double = $(deviationMeasure) match { + case "mean-abs-dev" => (x1: Double, x2: Double) => math.abs(x1 - x2) + case "rms-dev" => (x1: Double, x2: Double) => math.pow(x1 - x2, 2) + case "dev-rate" => (x1: Double, x2: Double) => if (x1 != x2) 1.0 else 0.0 + case "auto" => { + inheritances($(targetModel)) match { + case ih if ih.contains("RegressionModel") => + (x1: Double, x2: Double) => math.abs(x1 - x2) + case ih if ih.contains("ClassificationModel") => + (x1: Double, x2: Double) => if (x1 != x2) 1.0 else 0.0 + case _ => + throw new Exception(s"bad model class ${this.getTargetModel.getClass.getSimpleName}") + } + } + case _ => throw new Exception(s"bad deviation measure ${this.getDeviationMeasure}") + } + + override def copy(extra: ParamMap): TDigestFIModel = ??? + + def transformSchema(schema: StructType): StructType = { + require(schema.fieldNames.contains($(featuresCol))) + schema($(featuresCol)) match { + case sf: StructField => require(sf.dataType.equals(TDigestUDTInfra.udtVectorML)) + } + + // Output is two columns: feature names and corresponding importances + StructType(Seq( + StructField($(nameCol), StringType, false), + StructField($(importanceCol), DoubleType, false) + )) + } + + def transform(data: Dataset[_]): DataFrame = { + transformSchema(data.schema, logging = true) + val modelBC = spark.sparkContext.broadcast($(targetModel)) + val fdev = deviation + val (n, imp) = data.select(col($(featuresCol))).rdd.mapPartitions { (fvp: Iterator[Row]) => + val ftd = featTDBC.value + val model = modelBC.value + // The 'predict' method is part of the generic PredictionModel interface, + // however it is protected, so I have to force the issue using reflection. + // 'Method' is not serializable, so I have to do it inside this map + val predictMethod = model.getClass.getDeclaredMethods.find(_.getName == "predict").get + predictMethod.setAccessible(true) + val m = ftd.length + val (n, dev) = fvp.foldLeft((0L, Array.fill(m)(0.0))) { case ((n, dev), Row(fv: MLVector)) => + val farr = fv.toArray + require(farr.length == m, "bad feature vector length ${farr.length}") + // Declaring a dense vector around 'farr' allows me to overwrite individual + // feature values below. This works because dense vec just wraps the underlying + // array value. If the implementation of dense vec changes, this could break, + // although it seems unlikely. + val fvec = (new MLDense(farr)).asInstanceOf[AnyRef] + val refpred = predictMethod.invoke(model, fvec).asInstanceOf[Double] + for { j <- 0 until m } { + val t = farr(j) + farr(j) = ftd(j).sample + val pred = predictMethod.invoke(model, fvec).asInstanceOf[Double] + farr(j) = t + dev(j) += fdev(refpred, pred) + } + (n + 1, dev) + } + Iterator((n, dev)) + }.reduce { case ((n1, dev1), (n2, dev2)) => + require(dev1.length == dev2.length, "mismatched deviation vector sizes") + for { j <- 0 until dev1.length } { dev1(j) += dev2(j) } + (n1 + n2, dev1) + } + val nD = n.toDouble + for { j <- 0 until imp.length } { imp(j) /= nD } + val importances = if ($(deviationMeasure) != "rms-dev") imp else imp.map { x => math.sqrt(x) } + val featNames: Seq[String] = + if ($(featureNames).length > 0) { + $(featureNames) + } else { + (1 to featTD.length).map { j => s"f$j" } + } + require(featNames.length == featTD.length, s"expecting ${featTD.length} feature names") + modelBC.unpersist + spark.createDataFrame(featNames.zip(importances)).toDF($(nameCol), $(importanceCol)) + } + + override def finalize(): Unit = { + featTDBC.unpersist + super.finalize() + } +} + +/** + * An Estimator for creating a TDigestFI model from feature data. + */ +class TDigestFI(override val uid: String) extends Estimator[TDigestFIModel] with TDigestFIParams { + + def this() = this(Identifiable.randomUID("TDigestFI")) + + override def copy(extra: ParamMap): Estimator[TDigestFIModel] = ??? + + def transformSchema(schema: StructType): StructType = { + require(schema.fieldNames.contains($(featuresCol))) + schema($(featuresCol)) match { + case sf: StructField => require(sf.dataType.equals(TDigestUDTInfra.udtVectorML)) + } + // I can't figure out the purpose for outputting a modified schema here. + // Until further notice I'm going to output an empty one. + StructType(Seq.empty[StructField]) + } + + def fit(data: Dataset[_]): TDigestFIModel = { + val tds = data.select(col($(featuresCol))).rdd + .aggregate(Array.empty[TDigest])({ case (ttd, Row(fv: MLVector)) => + val m = fv.size + val td = + if (ttd.length > 0) ttd else Array.fill(m)(TDigest.empty($(delta), $(maxDiscrete))) + require(td.length == m, "Inconsistent feature vector size $m") + fv match { + case v: MLSparse => + var jBeg = 0 + v.foreachActive((j, x) => { + for { k <- jBeg until j } { td(k) += 0.0 } + td(j) += x + jBeg = j + 1 + }) + for { k <- jBeg until v.size } { td(k) += 0.0 } + case _ => + for { j <- 0 until fv.size } { td(j) += fv(j) } + } + td + }, + (td1, td2) => + if (td1.length == 0) { + td2 + } else if (td2.length == 0) { + td1 + } else { + require(td1.length == td2.length, "mismatched t-digest arrays") + for { j <- 0 until td1.length } { + td1(j) ++= td2(j) + } + td1 + }) + val model = new TDigestFIModel(uid, tds, data.sparkSession) + model.setParent(this) + model + } +} diff --git a/src/main/scala/org/isarnproject/pipelines/package.scala b/src/main/scala/org/isarnproject/pipelines/package.scala new file mode 100644 index 0000000..fcacf0b --- /dev/null +++ b/src/main/scala/org/isarnproject/pipelines/package.scala @@ -0,0 +1,29 @@ +/* +Copyright 2017 Erik Erlandson +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package org.isarnproject + +import scala.language.existentials + +package object pipelines { + private[pipelines] def inheritances(obj: Any): Seq[String] = { + var ih: List[Class[_]] = Nil + var sc = obj.getClass.getSuperclass + while (sc != null) { + ih = sc :: ih + sc = sc.getSuperclass + } + ih ++= obj.getClass.getInterfaces + ih.map(_.getSimpleName) + } +}