From d2ae15577a96c1aadf06ce3be408c0c4df4441ec Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 26 Nov 2024 21:31:57 -0800 Subject: [PATCH 1/3] Example of HostUDF usage Signed-off-by: Nghia Truong --- .../sql/rapids/aggregate/aggregateFunctions.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index 3486edd3140..b01e5102281 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -21,6 +21,7 @@ import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, Group import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression +import com.nvidia.spark.rapids.jni.AggregationUtils import com.nvidia.spark.rapids.shims.{GpuDeterministicFirstLastCollectShim, ShimExpression, TypeUtilsShims} import com.nvidia.spark.rapids.window._ @@ -59,13 +60,14 @@ class CudfSum(override val dataType: DataType) extends CudfAggregate { // sum(shorts): bigint // Aggregate [sum(shorts#33) AS sum(shorts)#50L] // - @transient lazy val rapidsSumType: DType = GpuColumnVector.getNonNestedRapidsType(dataType) + //@transient lazy val rapidsSumType: DType = GpuColumnVector.getNonNestedRapidsType(dataType) override val reductionAggregate: cudf.ColumnVector => cudf.Scalar = - (col: cudf.ColumnVector) => col.sum(rapidsSumType) - + (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.hostUDF( + AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.Reduction)), DType.INT64) override lazy val groupByAggregate: GroupByAggregation = - GroupByAggregation.sum() + GroupByAggregation.hostUDF( + AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.GroupByAggregation)) override val name: String = "CudfSum" } From c17f00723cc539d40e53a6827139e0c1b777987b Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 26 Nov 2024 22:21:04 -0800 Subject: [PATCH 2/3] Fix type Signed-off-by: Nghia Truong --- .../apache/spark/sql/rapids/aggregate/aggregateFunctions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index b01e5102281..f5f5c87882f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -64,7 +64,8 @@ class CudfSum(override val dataType: DataType) extends CudfAggregate { override val reductionAggregate: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.hostUDF( - AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.Reduction)), DType.INT64) + AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.Reduction)), + DType.FLOAT64) override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.hostUDF( AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.GroupByAggregation)) From 4c5d91c528a250079527b3a1a8701c0ed856c3e9 Mon Sep 17 00:00:00 2001 From: Nghia Truong Date: Tue, 17 Dec 2024 21:04:58 -0800 Subject: [PATCH 3/3] Fix compile error Signed-off-by: Nghia Truong --- .../sql/rapids/aggregate/aggregateFunctions.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala index f5f5c87882f..4c0a456dc46 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala @@ -21,7 +21,7 @@ import ai.rapids.cudf.{Aggregation128Utils, BinaryOp, ColumnVector, DType, Group import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression -import com.nvidia.spark.rapids.jni.AggregationUtils +import com.nvidia.spark.rapids.jni.TestHostUDF import com.nvidia.spark.rapids.shims.{GpuDeterministicFirstLastCollectShim, ShimExpression, TypeUtilsShims} import com.nvidia.spark.rapids.window._ @@ -64,11 +64,11 @@ class CudfSum(override val dataType: DataType) extends CudfAggregate { override val reductionAggregate: cudf.ColumnVector => cudf.Scalar = (col: cudf.ColumnVector) => col.reduce(ReductionAggregation.hostUDF( - AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.Reduction)), + TestHostUDF.createTestHostUDF(TestHostUDF.AggregationType.Reduction)), DType.FLOAT64) override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.hostUDF( - AggregationUtils.createTestHostUDF(AggregationUtils.AggregationType.GroupByAggregation)) + TestHostUDF.createTestHostUDF(TestHostUDF.AggregationType.GroupByAggregation)) override val name: String = "CudfSum" } @@ -2028,7 +2028,7 @@ case class GpuVarianceSamp(child: Expression, nullOnDivideByZero: Boolean) } case class GpuReplaceNullmask( - input: Expression, + input: Expression, mask: Expression) extends GpuExpression with ShimExpression { override def dataType: DataType = input.dataType @@ -2095,7 +2095,7 @@ abstract class GpuMaxMinByBase(valueExpr: Expression, orderingExpr: Expression) protected val cudfMaxMinByAggregate: CudfAggregate - private lazy val bufferOrdering: AttributeReference = + private lazy val bufferOrdering: AttributeReference = AttributeReference("ordering", orderingExpr.dataType)() private lazy val bufferValue: AttributeReference = @@ -2103,7 +2103,7 @@ abstract class GpuMaxMinByBase(valueExpr: Expression, orderingExpr: Expression) // Cudf allows only one column as input, so wrap value and ordering columns by // a struct before just going into cuDF. - private def createStructExpression(order: Expression, value: Expression): Expression = + private def createStructExpression(order: Expression, value: Expression): Expression = GpuReplaceNullmask( GpuCreateNamedStruct(Seq( GpuLiteral(CudfMaxMinBy.KEY_ORDERING, StringType), order,