-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add support for Hyper Log Log PLus Plus(HLL++) #11638
Draft
res-life
wants to merge
6
commits into
NVIDIA:branch-25.02
Choose a base branch
from
res-life:hll
base: branch-25.02
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 1 commit
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
32 changes: 32 additions & 0 deletions
32
integration_tests/src/main/python/hyper_log_log_plus_plus_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Copyright (c) 2021-2024, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import pytest | ||
|
||
from asserts import assert_gpu_and_cpu_are_equal_sql | ||
from data_gen import * | ||
|
||
@pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens, ids=idfn) | ||
def test_hllpp_groupby(data_gen): | ||
assert_gpu_and_cpu_are_equal_sql( | ||
lambda spark : gen_df(spark, [("c1", int_gen), ("c2", data_gen)]), | ||
"tab", | ||
"select c1, APPROX_COUNT_DISTINCT(c2) from tab group by c1") | ||
|
||
@pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens, ids=idfn) | ||
def test_hllpp_reduction(data_gen): | ||
assert_gpu_and_cpu_are_equal_sql( | ||
lambda spark : unary_op_df(spark, data_gen), | ||
"tab", | ||
"select APPROX_COUNT_DISTINCT(a) from tab") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
169 changes: 169 additions & 0 deletions
169
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/GpuHyperLogLogPlusPlus.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
/* | ||
* Copyright (c) 2024, NVIDIA CORPORATION. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.rapids.aggregate | ||
|
||
import scala.collection.immutable.Seq | ||
|
||
import ai.rapids.cudf | ||
import ai.rapids.cudf.{DType, GroupByAggregation, ReductionAggregation} | ||
import com.nvidia.spark.rapids._ | ||
import com.nvidia.spark.rapids.Arm.withResourceIfAllowed | ||
import com.nvidia.spark.rapids.RapidsPluginImplicits.ReallyAGpuExpression | ||
import com.nvidia.spark.rapids.jni.HLLPP | ||
import com.nvidia.spark.rapids.shims.ShimExpression | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} | ||
import org.apache.spark.sql.rapids.{GpuCreateNamedStruct, GpuGetStructField} | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
||
case class CudfHLLPP(override val dataType: DataType, | ||
precision: Int) extends CudfAggregate { | ||
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = | ||
(input: cudf.ColumnVector) => input.reduce( | ||
ReductionAggregation.HLLPP(precision), DType.STRUCT) | ||
override lazy val groupByAggregate: GroupByAggregation = | ||
GroupByAggregation.HLLPP(precision) | ||
override val name: String = "CudfHyperLogLogPlusPlus" | ||
} | ||
|
||
case class CudfMergeHLLPP(override val dataType: DataType, | ||
precision: Int) | ||
extends CudfAggregate { | ||
override lazy val reductionAggregate: cudf.ColumnVector => cudf.Scalar = | ||
(input: cudf.ColumnVector) => | ||
input.reduce(ReductionAggregation.mergeHLL(precision), DType.STRUCT) | ||
override lazy val groupByAggregate: GroupByAggregation = | ||
GroupByAggregation.mergeHLL(precision) | ||
override val name: String = "CudfMergeHyperLogLogPlusPlus" | ||
} | ||
|
||
/** | ||
* Perform the final evaluation step to compute approximate count distinct from sketches. | ||
* Input is long columns, first construct struct of long then feed to cuDF | ||
*/ | ||
case class GpuHyperLogLogPlusPlusEvaluation(childExpr: Expression, | ||
precision: Int) | ||
extends GpuExpression with ShimExpression { | ||
override def dataType: DataType = LongType | ||
|
||
override def nullable: Boolean = false | ||
|
||
override def prettyName: String = "HyperLogLogPlusPlus_evaluation" | ||
|
||
override def children: scala.Seq[Expression] = Seq(childExpr) | ||
|
||
override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { | ||
withResourceIfAllowed(childExpr.columnarEval(batch)) { sketches => | ||
val distinctValues = HLLPP.estimateDistinctValueFromSketches( | ||
sketches.getBase, precision) | ||
GpuColumnVector.from(distinctValues, LongType) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Gpu version of HyperLogLogPlusPlus | ||
* Spark APPROX_COUNT_DISTINCT on NULLs returns zero | ||
*/ | ||
case class GpuHyperLogLogPlusPlus(childExpr: Expression, relativeSD: Double) | ||
extends GpuAggregateFunction with Serializable { | ||
|
||
// Consistent with Spark | ||
private lazy val precision: Int = | ||
Math.ceil(2.0d * Math.log(1.106d / relativeSD) / Math.log(2.0d)).toInt; | ||
|
||
private lazy val numRegistersPerSketch: Int = 1 << precision; | ||
|
||
// Each long contains 10 register(max 6 bits) | ||
private lazy val numWords = numRegistersPerSketch / 10 + 1 | ||
|
||
// Spark agg buffer type: long array | ||
private lazy val sparkAggBufferAttributes: Seq[AttributeReference] = { | ||
Seq.tabulate(numWords) { i => | ||
AttributeReference(s"MS[$i]", LongType)() | ||
} | ||
} | ||
|
||
/** | ||
* Spark uses long columns to save agg buffer, e.g.: long[52] | ||
* Each long compacts multiple registers to save memory | ||
*/ | ||
override val aggBufferAttributes: Seq[AttributeReference] = sparkAggBufferAttributes | ||
|
||
/** | ||
* init long array with all zero | ||
*/ | ||
override lazy val initialValues: Seq[Expression] = Seq.tabulate(numWords) { _ => | ||
GpuLiteral(0L, LongType) | ||
} | ||
|
||
override lazy val inputProjection: Seq[Expression] = Seq(childExpr) | ||
|
||
/** | ||
* cuDF HLLPP sketch type: struct<long, ..., long> | ||
*/ | ||
private lazy val cuDFBufferType: DataType = StructType.fromAttributes(aggBufferAttributes) | ||
|
||
/** | ||
* cuDF uses Struct<long, ..., long> column to do aggregate | ||
*/ | ||
override lazy val updateAggregates: Seq[CudfAggregate] = | ||
Seq(CudfHLLPP(cuDFBufferType, precision)) | ||
|
||
/** | ||
* Convert long columns to Struct<long, ..., long> column | ||
*/ | ||
private def genStruct: Seq[Expression] = { | ||
val names = Seq.tabulate(numWords) { i => GpuLiteral(s"MS[$i]", StringType) } | ||
Seq(GpuCreateNamedStruct(names.zip(aggBufferAttributes).flatten { case (a, b) => List(a, b) })) | ||
} | ||
|
||
/** | ||
* Convert Struct<long, ..., long> column to long columns | ||
*/ | ||
override lazy val postUpdate: Seq[Expression] = Seq.tabulate(numWords) { | ||
i => GpuGetStructField(postUpdateAttr.head, i) | ||
} | ||
|
||
/** | ||
* convert to Struct<long, ..., long> | ||
*/ | ||
override lazy val preMerge: Seq[Expression] = genStruct | ||
|
||
override lazy val mergeAggregates: Seq[CudfAggregate] = | ||
Seq(CudfMergeHLLPP(cuDFBufferType, precision)) | ||
|
||
/** | ||
* Convert Struct<long, ..., long> column to long columns | ||
*/ | ||
override lazy val postMerge: Seq[Expression] = Seq.tabulate(numWords) { | ||
i => GpuGetStructField(postMergeAttr.head, i) | ||
} | ||
|
||
override lazy val evaluateExpression: Expression = | ||
GpuHyperLogLogPlusPlusEvaluation(genStruct.head, precision) | ||
|
||
override def dataType: DataType = LongType | ||
|
||
// Spark APPROX_COUNT_DISTINCT on NULLs returns zero | ||
override def nullable: Boolean = false | ||
|
||
override def prettyName: String = "approx_count_distinct" | ||
|
||
override def children: Seq[Expression] = Seq(childExpr) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Using
cpuAtomics
for a GPU field gets to be kind of confusing. Could you please create agpuAtomics
instead?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update to support map, array and list because this is merged: NVIDIA/spark-rapids-jni#2575