Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add support for Hyper Log Log PLus Plus(HLL++) #11638

Draft
wants to merge 6 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/hashing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

_struct_of_xxhash_gens = StructGen([(f"c{i}", g) for i, g in enumerate(_atomic_gens)])

# will be used by HyperLogLogPlusPLus(approx_count_distinct)
# This is also used by HyperLogLogPlusPLus(approx_count_distinct)
xxhash_gens = (_atomic_gens + [_struct_of_xxhash_gens] + single_level_array_gens
+ nested_array_gens_sample + [
all_basic_struct_gen,
Expand Down
38 changes: 38 additions & 0 deletions integration_tests/src/main/python/hyper_log_log_plus_plus_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from hashing_test import xxhash_gens
from marks import ignore_order


@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', xxhash_gens, ids=idfn)
def test_hllpp_groupby(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark: gen_df(spark, [("c1", int_gen), ("c2", data_gen)]),
"tab",
"select c1, APPROX_COUNT_DISTINCT(c2) from tab group by c1")


@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', xxhash_gens, ids=idfn)
def test_hllpp_reduction(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark: unary_op_df(spark, data_gen),
"tab",
"select APPROX_COUNT_DISTINCT(a) from tab")
Original file line number Diff line number Diff line change
Expand Up @@ -3988,6 +3988,36 @@ object GpuOverrides extends Logging {
GpuDynamicPruningExpression(child)
}
}),
expr[HyperLogLogPlusPlus](
"Aggregation approximate count distinct",
ExprChecks.reductionAndGroupByAgg(TypeSig.LONG, TypeSig.LONG,
// HyperLogLogPlusPlus depends on Xxhash64
// HyperLogLogPlusPlus supports all the types that Xxhash 64 supports
Seq(ParamCheck("input",XxHash64Shims.supportedTypes, TypeSig.all))),
(a, conf, p, r) => new UnaryExprMeta[HyperLogLogPlusPlus](a, conf, p, r) {

// It's the same as Xxhash64
override def tagExprForGpu(): Unit = {
val maxDepth = a.children.map(
c => XxHash64Utils.computeMaxStackSize(c.dataType)).max
if (maxDepth > Hash.MAX_STACK_DEPTH) {
willNotWorkOnGpu(s"The data type requires a stack depth of $maxDepth, " +
s"which exceeds the GPU limit of ${Hash.MAX_STACK_DEPTH}. " +
"The algorithm to calculate stack depth: " +
"1: Primitive type counts 1 depth; " +
"2: Array of Structure counts: 1 + depthOf(Structure); " +
"3: Array of Other counts: depthOf(Other); " +
"4: Structure counts: 1 + max of depthOf(child); " +
"5: Map counts: 2 + max(depthOf(key), depthOf(value)); "
)
}
}

override def convertToGpu(child: Expression): GpuExpression = {
GpuHyperLogLogPlusPlus(child, a.relativeSD)
}
}
),
SparkShimImpl.ansiCastRule
).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.aggregate

import scala.collection.immutable.Seq

import ai.rapids.cudf
import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.Arm.{withResource, withResourceIfAllowed}
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression
import com.nvidia.spark.rapids.jni.HyperLogLogPlusPlusHostUDF
import com.nvidia.spark.rapids.shims.ShimExpression

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.rapids.{GpuCreateNamedStruct, GpuGetStructField}
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class CudfHLLPP(override val dataType: DataType,
precision: Int) extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => {
if (input.getNullCount == input.getRowCount) {
// For NullType column or all values are null,
// return a struct scalar: struct(0L, 0L, ..., 0L)
val numCols = (1 << precision) / 10 + 1
withResource(cudf.ColumnVector.fromLongs(0L)) { cv =>
// Underlying uses deep-copy, so we can reuse this `cv` and fill multiple times.
val cvs: Array[cudf.ColumnView] = Array.fill(numCols)(cv)
cudf.Scalar.structFromColumnViews(cvs: _*)
}
} else {
input.reduce(
ReductionAggregation.hostUDF(
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.Reduction, precision)),
DType.STRUCT)
}
}
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.hostUDF(
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.GroupBy, precision)
)
override val name: String = "CudfHyperLogLogPlusPlus"
}

case class CudfMergeHLLPP(override val dataType: DataType,
precision: Int)
extends CudfAggregate {
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar =
(input: cudf.ColumnVector) => input.reduce(
ReductionAggregation.hostUDF(
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.Reduction_MERGE, precision)),
DType.STRUCT)
override lazy val groupByAggregate: GroupByAggregation =
GroupByAggregation.hostUDF(
HyperLogLogPlusPlusHostUDF.createHLLPPHostUDF(
HyperLogLogPlusPlusHostUDF.AggregationType.GroupByMerge, precision)
)
override val name: String = "CudfMergeHyperLogLogPlusPlus"
}

/**
* Perform the final evaluation step to compute approximate count distinct from sketches.
* Input is long columns, first construct struct of long then feed to cuDF
*/
case class GpuHyperLogLogPlusPlusEvaluation(childExpr: Expression,
precision: Int)
extends GpuExpression with ShimExpression {
override def dataType: DataType = LongType

override def nullable: Boolean = false

override def prettyName: String = "HyperLogLogPlusPlus_evaluation"

override def children: scala.Seq[Expression] = Seq(childExpr)

override def columnarEval(batch: ColumnarBatch): GpuColumnVector = {
withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches =>
val distinctValues = HyperLogLogPlusPlusHostUDF.estimateDistinctValueFromSketches(
sketches.getBase, precision)
GpuColumnVector.from(distinctValues, LongType)
}
}
}

/**
* Gpu version of HyperLogLogPlusPlus
* Spark APPROX_COUNT_DISTINCT on NULLs returns zero
*/
case class GpuHyperLogLogPlusPlus(childExpr: Expression, relativeSD: Double)
extends GpuAggregateFunction with Serializable {

// Consistent with Spark
private lazy val precision: Int =
Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt;

private lazy val numRegistersPerSketch: Int = 1 << precision;

// Each long contains 10 register(max 6 bits)
private lazy val numWords = numRegistersPerSketch / 10 + 1

// Spark agg buffer type: long array
private lazy val sparkAggBufferAttributes: Seq[AttributeReference] = {
Seq.tabulate(numWords) { i =>
AttributeReference(s"MS[$i]", LongType)()
}
}

/**
* Spark uses long columns to save agg buffer, e.g.: long[52]
* Each long compacts multiple registers to save memory
*/
override val aggBufferAttributes: Seq[AttributeReference] = sparkAggBufferAttributes

/**
* init long array with all zero
*/
override lazy val initialValues: Seq[Expression] = Seq.tabulate(numWords) { _ =>
GpuLiteral(0L, LongType)
}

override lazy val inputProjection: Seq[Expression] = Seq(childExpr)

/**
* cuDF HLLPP sketch type: struct<long, ..., long>
*/
private lazy val cuDFBufferType: DataType = StructType.fromAttributes(aggBufferAttributes)

/**
* cuDF uses Struct<long, ..., long> column to do aggregate
*/
override lazy val updateAggregates: Seq[CudfAggregate] =
Seq(CudfHLLPP(cuDFBufferType, precision))

/**
* Convert long columns to Struct<long, ..., long> column
*/
private def genStruct: Seq[Expression] = {
val names = Seq.tabulate(numWords) { i => GpuLiteral(s"MS[$i]", StringType) }
Seq(GpuCreateNamedStruct(names.zip(aggBufferAttributes).flatten { case (a, b) => List(a, b) }))
}

/**
* Convert Struct<long, ..., long> column to long columns
*/
override lazy val postUpdate: Seq[Expression] = Seq.tabulate(numWords) {
i => GpuGetStructField(postUpdateAttr.head, i)
}

/**
* convert to Struct<long, ..., long>
*/
override lazy val preMerge: Seq[Expression] = genStruct

override lazy val mergeAggregates: Seq[CudfAggregate] =
Seq(CudfMergeHLLPP(cuDFBufferType, precision))

/**
* Convert Struct<long, ..., long> column to long columns
*/
override lazy val postMerge: Seq[Expression] = Seq.tabulate(numWords) {
i => GpuGetStructField(postMergeAttr.head, i)
}

override lazy val evaluateExpression: Expression =
GpuHyperLogLogPlusPlusEvaluation(genStruct.head, precision)

override def dataType: DataType = LongType

// Spark APPROX_COUNT_DISTINCT on NULLs returns zero
override def nullable: Boolean = false

override def prettyName: String = "approx_count_distinct"

override def children: Seq[Expression] = Seq(childExpr)
}
1 change: 1 addition & 0 deletions tools/generated_files/330/operatorsScore.csv
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ HiveGenericUDF,4
HiveHash,4
HiveSimpleUDF,4
Hour,4
HyperLogLogPlusPlus,4
Hypot,4
If,4
In,4
Expand Down
4 changes: 4 additions & 0 deletions tools/generated_files/330/supportedExprs.csv
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,10 @@ First,S,`first_value`; `first`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,N
First,S,`first_value`; `first`,None,reduction,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
First,S,`first_value`; `first`,None,window,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
First,S,`first_value`; `first`,None,window,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
HyperLogLogPlusPlus,S,`approx_count_distinct`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,NS,NS,NS,NS,NS,NS
HyperLogLogPlusPlus,S,`approx_count_distinct`,None,aggregation,result,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
HyperLogLogPlusPlus,S,`approx_count_distinct`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,NS,S,NS,NS,NS,NS,NS,NS,NS
HyperLogLogPlusPlus,S,`approx_count_distinct`,None,reduction,result,NA,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
Last,S,`last_value`; `last`,None,aggregation,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
Last,S,`last_value`; `last`,None,aggregation,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
Last,S,`last_value`; `last`,None,reduction,input,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
Expand Down
Loading