-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce hybrid (CPU) scan for Parquet read [databricks] #11720
Open
res-life
wants to merge
34
commits into
NVIDIA:branch-25.02
Choose a base branch
from
res-life:merge-c2c
base: branch-25.02
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+934
−10
Open
Changes from 29 commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
65de010
Merge C2C code to main
e6cede2
Update the dependencies in pom.xml
1e4fc13
revert BD velox hdfs code
46e19df
fit codes into the new HybridScan hierarchy
sperlingxx 4f2a4d6
refine QueryPlan, RapidsMeta and test suites for HybridScan
sperlingxx 4d52f90
Integrate Hybrid plugin; update IT
c82eb29
Make Hybrid jar provoided scope; Update shim to only applicable for S…
65b585a
Fix comments
d214739
Code comment update, a minor change
e0f1e3b
Fix shim logic
6331ab8
Fix shims: build for all shims, but report error when Spark is CDH or…
b1b8481
Remove useless shim code
dbae63f
IT: add tests for decimal types
5e972d6
Add checks for Java/Scala version, only supports Java 1.8 and Scala 2.12
c6fa249
Check datasource is v1
e95e7cc
Update test case: skip if runtime Spark is Databricks
092dab8
Update Hybrid Config doc: not all Spark versions are fully tested, on…
519f33c
Merge branch 'branch-25.02' into merge-c2c
b3b6f80
some refinement
sperlingxx f0921a4
fix tests && unsupported types
sperlingxx dd5d8f9
Add doc for Hybrid execution feature
6149589
Update doc
114b93a
Check Hybrid jar in executor
36d3cdf
Update hybrid-execution.md
winningsix 5bfd763
Remove check for Java versions
275fa3d
Fix scala 2.13 check failure
cbb5609
Fix for Scala 2.13 building
c1df7c4
Fix: specify default value for loadBackend to avoid exception
99602c5
Update Copyright to add new year 2025
3d3b172
Fix Databricks building
d718d8c
Minor format change
4e79c2b
Merge branch 'branch-25.02' into merge-c2c
aae45b9
Add shim 354 for Hybrid feature
4fd5fdb
Fix Databricks building
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
--- | ||
layout: page | ||
title: The Hybrid(on CPU) execution | ||
nav_order: 14 | ||
parent: Developer Overview | ||
--- | ||
|
||
# The Hybrid(CPU/GPU) execution | ||
Note: this is an experimental feature currently. | ||
|
||
## Overview | ||
The Hybrid execution provides a way to offload Parquet scan onto CPU by leveraging Gluten/Velox. | ||
|
||
## Configuration | ||
To enable Hybrid Execution, please set the following configurations: | ||
``` | ||
"spark.sql.sources.useV1SourceList": "parquet" | ||
"spark.rapids.sql.parquet.useHybridReader": "true" | ||
"spark.rapids.sql.hybrid.loadBackend": "true" | ||
``` | ||
|
||
## Build | ||
### Build Gluten bundle and third party jars. | ||
Hybrid execution targets Gluten v1.2.0 code tag. | ||
For the Gluten building, please refer to [link](https://github.com/apache/incubator-gluten). | ||
Start the docker Gluten project provided, then execute the following | ||
```bash | ||
git clone https://github.com/apache/incubator-gluten.git | ||
git checkout v1.2.0 | ||
# Cherry pick a fix from main branch: Fix ObjectStore::stores initialized twice issue | ||
git cherry-pick 2a6a974d6fbaa38869eb9a0b91b2e796a578884c | ||
./dev/package.sh | ||
``` | ||
Note: Should cherry-pick a fix as shown in the above steps. | ||
In the $Gluten_ROOT/package/target, you can get the bundle third_party jars. | ||
|
||
### Download Rapids Hybrid jar from Maven repo | ||
```xml | ||
<dependency> | ||
<groupId>com.nvidia</groupId> | ||
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
``` | ||
|
||
## How to use | ||
Decide the Spark version. Set the configurations as described in the above section. | ||
Prepare the Gluten bundle and third party jars for the Spark version as described | ||
in the above section. Get the Rapids Hybrid jar. Put the jars(Gluten two jars and | ||
the Rapids hybrid jar) in the classpath by specifying: | ||
`--jars=<gluten-bundle-jar>,<gluten-thirdparty-jar>,<rapids-hybrid-jar>` | ||
|
||
## Limitations | ||
- Only supports V1 Parquet data source. | ||
- Only supports Scala 2.12, do not support Scala 2.13. | ||
- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 like [Gluten supports](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0), | ||
other Spark versions 32x, 33x, 34x, 35x also work, but are not fully tested. | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
147 changes: 147 additions & 0 deletions
147
integration_tests/src/main/python/hybrid_parquet_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
# Copyright (c) 2024-2025, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import os | ||
|
||
import pytest | ||
|
||
from asserts import * | ||
from data_gen import * | ||
from marks import * | ||
from parquet_test import rebase_write_corrected_conf | ||
from spark_session import * | ||
|
||
""" | ||
Hybrid Scan unsupported types: | ||
1. Decimal with negative scale is NOT supported | ||
2. Decimal128 inside nested types is NOT supported | ||
3. BinaryType is NOT supported | ||
4. MapType wrapped by NestedType (Struct of Map/Array of Map/Map of Map) is NOT fully supported | ||
""" | ||
parquet_gens_list = [ | ||
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, | ||
string_gen, boolean_gen, date_gen, | ||
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), ArrayGen(byte_gen), | ||
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen), | ||
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), | ||
ArrayGen(decimal_gen_64bit), | ||
ArrayGen(ArrayGen(byte_gen)), | ||
StructGen([['child0', ArrayGen(byte_gen)], | ||
['child1', byte_gen], | ||
['child2', float_gen], | ||
['child3', decimal_gen_64bit]]), | ||
ArrayGen(StructGen([['child0', string_gen], | ||
['child1', double_gen], | ||
['child2', int_gen]])) | ||
], | ||
[MapGen(f(nullable=False), f()) for f in [ | ||
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, | ||
lambda nullable=True: TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc), nullable=nullable)] | ||
], | ||
[simple_string_to_string_map_gen, | ||
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10), | ||
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10), | ||
], | ||
decimal_gens, | ||
] | ||
|
||
parquet_gens_fallback_lists = [ | ||
# Decimal128 inside nested types is NOT supported | ||
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), decimal_gen_128bit)], | ||
# BinaryType is NOT supported | ||
[BinaryGen()], | ||
# MapType wrapped by NestedType is NOT fully supported | ||
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)], | ||
[ArrayGen(simple_string_to_string_map_gen)], | ||
[ArrayGen(ArrayGen(simple_string_to_string_map_gen))], | ||
[ArrayGen(StructGen([["c0", simple_string_to_string_map_gen]]))], | ||
[StructGen([["c0", simple_string_to_string_map_gen]])], | ||
[StructGen([["c0", ArrayGen(simple_string_to_string_map_gen)]])], | ||
[StructGen([["c0", StructGen([["cc0", simple_string_to_string_map_gen]])]])], | ||
] | ||
|
||
|
||
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") | ||
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") | ||
@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) | ||
@pytest.mark.parametrize('gen_rows', [20, 100, 512, 1024, 4096], ids=idfn) | ||
@hybrid_test | ||
def test_hybrid_parquet_read_round_trip(spark_tmp_path, parquet_gens, gen_rows): | ||
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] | ||
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_cpu_session( | ||
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path), | ||
conf=rebase_write_corrected_conf) | ||
|
||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark: spark.read.parquet(data_path), | ||
conf={ | ||
'spark.sql.sources.useV1SourceList': 'parquet', | ||
'spark.rapids.sql.parquet.useHybridReader': 'true', | ||
}) | ||
|
||
|
||
# Creating scenarios in which CoalesceConverter will coalesce several input batches by adjusting | ||
# reader_batch_size and coalesced_batch_size, tests if the CoalesceConverter functions correctly | ||
# when coalescing is needed. | ||
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") | ||
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") | ||
@pytest.mark.parametrize('reader_batch_size', [512, 1024, 2048], ids=idfn) | ||
@pytest.mark.parametrize('coalesced_batch_size', [1 << 25, 1 << 27], ids=idfn) | ||
@pytest.mark.parametrize('gen_rows', [8192, 10000], ids=idfn) | ||
@hybrid_test | ||
def test_hybrid_parquet_read_round_trip_multiple_batches(spark_tmp_path, | ||
reader_batch_size, | ||
coalesced_batch_size, | ||
gen_rows): | ||
gens = [] | ||
for g in parquet_gens_list: | ||
gens.extend(g) | ||
|
||
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)] | ||
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_cpu_session( | ||
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path), | ||
conf=rebase_write_corrected_conf) | ||
|
||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark: spark.read.parquet(data_path), | ||
conf={ | ||
'spark.sql.sources.useV1SourceList': 'parquet', | ||
'spark.rapids.sql.parquet.useHybridReader': 'true', | ||
'spark.gluten.sql.columnar.maxBatchSize': reader_batch_size, | ||
'spark.rapids.sql.batchSizeBytes': coalesced_batch_size, | ||
}) | ||
|
||
|
||
# HybridScan shall NOT be enabled over unsupported data types. Instead, fallbacks to GpuScan. | ||
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") | ||
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") | ||
@pytest.mark.parametrize('parquet_gens', parquet_gens_fallback_lists, ids=idfn) | ||
@hybrid_test | ||
def test_hybrid_parquet_read_fallback_to_gpu(spark_tmp_path, parquet_gens): | ||
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] | ||
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_cpu_session( | ||
lambda spark: gen_df(spark, gen_list, length=512).write.parquet(data_path), | ||
conf=rebase_write_corrected_conf) | ||
|
||
assert_cpu_and_gpu_are_equal_collect_with_capture( | ||
lambda spark: spark.read.parquet(data_path), | ||
exist_classes='GpuFileSourceScanExec', | ||
non_exist_classes='HybridFileSourceScanExec', | ||
conf={ | ||
'spark.sql.sources.useV1SourceList': 'parquet', | ||
'spark.rapids.sql.parquet.useHybridReader': 'true', | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
sql-plugin/src/main/scala/com/nvidia/spark/rapids/HybridExecutionUtils.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (c) 2024-2025, NVIDIA CORPORATION. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.nvidia.spark.rapids | ||
|
||
object HybridExecutionUtils { | ||
|
||
private val HYBRID_JAR_PLUGIN_CLASS_NAME = "com.nvidia.spark.rapids.hybrid.HybridPluginWrapper" | ||
|
||
/** | ||
* Check if the Hybrid jar is in the classpath, | ||
* report error if not | ||
*/ | ||
def checkHybridJarInClassPath(): Unit = { | ||
try { | ||
Class.forName(HYBRID_JAR_PLUGIN_CLASS_NAME) | ||
} catch { | ||
case e: ClassNotFoundException => throw new RuntimeException( | ||
"Hybrid jar is not in the classpath, Please add Hybrid jar into the class path, or " + | ||
"Please disable Hybrid feature by setting " + | ||
"spark.rapids.sql.parquet.useHybridReader=false", e) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we add a few comments about what cases this appears to be better than the current parquet scan so that customers can know if it is worth the effort to try this out?
Do we need/want to mention some of the limitations with different data types? And are there any gluten specific configs that they need to set to make this work for them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#11966