Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix
Browse files Browse the repository at this point in the history
Chong Gao committed Oct 23, 2024

Unverified

This user has not yet uploaded their public signing key.
1 parent 35518d8 commit 1945192
Showing 2 changed files with 37 additions and 15 deletions.
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
@@ -102,7 +102,7 @@ def test_hll():
assert_gpu_and_cpu_are_equal_sql(
lambda spark : spark.read.parquet("/home/chongg/a"),
"tab",
"select c1, APPROX_COUNT_DISTINCT(c1) from tab group by c1"
"select c1, APPROX_COUNT_DISTINCT(c2) from tab group by c1"
)

@pytest.mark.parametrize('data_gen', _arith_data_gens, ids=idfn)
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@

package org.apache.spark.sql.rapids.aggregate

import scala.collection.immutable.Seq

import ai.rapids.cudf
import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids._
@@ -25,24 +27,29 @@ import com.nvidia.spark.rapids.jni.HLL
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.catalyst.util.{GenericArrayData, HyperLogLogPlusPlusHelper}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class CudfHLL(override val dataType: DataType) extends CudfAggregate {
case class CudfHLL(override val dataType: DataType,
numRegistersPerSketch: Int) extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(ReductionAggregation.HLL(), DType.LIST)
override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.HLL(32 * 1024)
(input: cudf.ColumnVector) => input.reduce(
ReductionAggregation.HLL(numRegistersPerSketch), DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.HLL(numRegistersPerSketch)
override val name: String = "CudfHLL"
}

case class CudfMergeHLL(override val dataType: DataType)
case class CudfMergeHLL(override val dataType: DataType,
numRegistersPerSketch: Int)
extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) =>
input.reduce(ReductionAggregation.mergeHLL(), DType.LIST)
input.reduce(ReductionAggregation.mergeHLL(numRegistersPerSketch), DType.STRUCT)

override lazy val groupByAggregate: GroupByAggregation = GroupByAggregation.mergeHLL()
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.mergeHLL(numRegistersPerSketch)
override val name: String = "CudfMergeHLL"
}

@@ -67,27 +74,42 @@ case class GpuHLLEvaluation(childExpr: Expression, precision: Int)
}
}

case class GpuHLL(childExpr: Expression, precision: Int)
case class GpuHLL(childExpr: Expression, relativeSD: Double)
extends GpuAggregateFunction with Serializable {

// specify the HLL sketch type: list<byte>
private lazy val hllBufferType: DataType = ArrayType(ByteType, containsNull = false)
// Consistent with Spark
private lazy val numRegistersPerSketch: Int =
1 << Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt

// specify the HLL sketch type: struct<long, ..., long>
private lazy val hllBufferType: DataType = StructType.fromAttributes(aggBufferAttributes)

private lazy val hllBufferAttribute: AttributeReference =
AttributeReference("hllAttr", hllBufferType)()

// TODO: should be long array literal
override lazy val initialValues: Seq[Expression] =
Seq(GpuLiteral.create(new GenericArrayData(Array.ofDim[Byte](32 * 1024)), hllBufferType))

override lazy val inputProjection: Seq[Expression] = Seq(childExpr)

override lazy val updateAggregates: Seq[CudfAggregate] = Seq(CudfHLL(hllBufferType))
override lazy val updateAggregates: Seq[CudfAggregate] =
Seq(CudfHLL(hllBufferType, numRegistersPerSketch))

override lazy val mergeAggregates: Seq[CudfAggregate] =
Seq(CudfMergeHLL(hllBufferType, numRegistersPerSketch))

override lazy val mergeAggregates: Seq[CudfAggregate] = Seq(CudfMergeHLL(hllBufferType))
override lazy val evaluateExpression: Expression =
GpuHLLEvaluation(hllBufferAttribute, numRegistersPerSketch)

override lazy val evaluateExpression: Expression = GpuHLLEvaluation(hllBufferAttribute, precision)
private val hllppHelper = new HyperLogLogPlusPlusHelper(relativeSD)

override def aggBufferAttributes: Seq[AttributeReference] = hllBufferAttribute :: Nil
/** Allocate enough words to store all registers. */
override val aggBufferAttributes: Seq[AttributeReference] = {
Seq.tabulate(hllppHelper.numWords) { i =>
AttributeReference(s"MS[$i]", LongType)()
}
}

override def dataType: DataType = hllBufferType

0 comments on commit 1945192

Please sign in to comment.