-
Notifications
You must be signed in to change notification settings - Fork 242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Hybrid test cases in premerge/nightly CIs [databricks] #11906
base: branch-25.02
Are you sure you want to change the base?
Changes from 35 commits
65de010
e6cede2
1e4fc13
46e19df
4f2a4d6
4d52f90
c82eb29
65b585a
d214739
e0f1e3b
6331ab8
b1b8481
dbae63f
5e972d6
c6fa249
e95e7cc
092dab8
519f33c
b3b6f80
f0921a4
dd5d8f9
6149589
114b93a
36d3cdf
5bfd763
275fa3d
cbb5609
c1df7c4
99602c5
3d3b172
d718d8c
4e79c2b
aae45b9
4fd5fdb
fd2b513
7b87caa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
--- | ||
layout: page | ||
title: The Hybrid(on CPU) execution | ||
nav_order: 14 | ||
parent: Developer Overview | ||
--- | ||
|
||
# The Hybrid(CPU/GPU) execution | ||
Note: this is an experimental feature currently. | ||
|
||
## Overview | ||
The Hybrid execution provides a way to offload Parquet scan onto CPU by leveraging Gluten/Velox. | ||
|
||
## Configuration | ||
To enable Hybrid Execution, please set the following configurations: | ||
``` | ||
"spark.sql.sources.useV1SourceList": "parquet" | ||
"spark.rapids.sql.parquet.useHybridReader": "true" | ||
"spark.rapids.sql.hybrid.loadBackend": "true" | ||
``` | ||
|
||
## Build | ||
### Build Gluten bundle and third party jars. | ||
Hybrid execution targets Gluten v1.2.0 code tag. | ||
For the Gluten building, please refer to [link](https://github.com/apache/incubator-gluten). | ||
Start the docker Gluten project provided, then execute the following | ||
```bash | ||
git clone https://github.com/apache/incubator-gluten.git | ||
git checkout v1.2.0 | ||
# Cherry pick a fix from main branch: Fix ObjectStore::stores initialized twice issue | ||
git cherry-pick 2a6a974d6fbaa38869eb9a0b91b2e796a578884c | ||
./dev/package.sh | ||
``` | ||
Note: Should cherry-pick a fix as shown in the above steps. | ||
In the $Gluten_ROOT/package/target, you can get the bundle third_party jars. | ||
|
||
### Download Rapids Hybrid jar from Maven repo | ||
```xml | ||
<dependency> | ||
<groupId>com.nvidia</groupId> | ||
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
``` | ||
|
||
## How to use | ||
Decide the Spark version. Set the configurations as described in the above section. | ||
Prepare the Gluten bundle and third party jars for the Spark version as described | ||
in the above section. Get the Rapids Hybrid jar. Put the jars(Gluten two jars and | ||
the Rapids hybrid jar) in the classpath by specifying: | ||
`--jars=<gluten-bundle-jar>,<gluten-thirdparty-jar>,<rapids-hybrid-jar>` | ||
|
||
## Limitations | ||
- Only supports V1 Parquet data source. | ||
- Only supports Scala 2.12, do not support Scala 2.13. | ||
- Support Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 like [Gluten supports](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0), | ||
other Spark versions 32x, 33x, 34x, 35x also work, but are not fully tested. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
# Copyright (c) 2024-2025, NVIDIA CORPORATION. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import os | ||
|
||
import pytest | ||
|
||
from asserts import * | ||
from data_gen import * | ||
from marks import * | ||
from parquet_test import rebase_write_corrected_conf | ||
from spark_session import * | ||
|
||
""" | ||
Hybrid Scan unsupported types: | ||
1. Decimal with negative scale is NOT supported | ||
2. Decimal128 inside nested types is NOT supported | ||
3. BinaryType is NOT supported | ||
4. MapType wrapped by NestedType (Struct of Map/Array of Map/Map of Map) is NOT fully supported | ||
""" | ||
parquet_gens_list = [ | ||
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, | ||
string_gen, boolean_gen, date_gen, | ||
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)), ArrayGen(byte_gen), | ||
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen), | ||
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))), | ||
ArrayGen(decimal_gen_64bit), | ||
ArrayGen(ArrayGen(byte_gen)), | ||
StructGen([['child0', ArrayGen(byte_gen)], | ||
['child1', byte_gen], | ||
['child2', float_gen], | ||
['child3', decimal_gen_64bit]]), | ||
ArrayGen(StructGen([['child0', string_gen], | ||
['child1', double_gen], | ||
['child2', int_gen]])) | ||
], | ||
[MapGen(f(nullable=False), f()) for f in [ | ||
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, | ||
lambda nullable=True: TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc), nullable=nullable)] | ||
], | ||
[simple_string_to_string_map_gen, | ||
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10), | ||
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10), | ||
], | ||
decimal_gens, | ||
] | ||
|
||
parquet_gens_fallback_lists = [ | ||
# Decimal128 inside nested types is NOT supported | ||
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), decimal_gen_128bit)], | ||
# BinaryType is NOT supported | ||
[BinaryGen()], | ||
# MapType wrapped by NestedType is NOT fully supported | ||
[MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)], | ||
[ArrayGen(simple_string_to_string_map_gen)], | ||
[ArrayGen(ArrayGen(simple_string_to_string_map_gen))], | ||
[ArrayGen(StructGen([["c0", simple_string_to_string_map_gen]]))], | ||
[StructGen([["c0", simple_string_to_string_map_gen]])], | ||
[StructGen([["c0", ArrayGen(simple_string_to_string_map_gen)]])], | ||
[StructGen([["c0", StructGen([["cc0", simple_string_to_string_map_gen]])]])], | ||
] | ||
|
||
|
||
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") | ||
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") | ||
@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) | ||
@pytest.mark.parametrize('gen_rows', [20, 100, 512, 1024, 4096], ids=idfn) | ||
@hybrid_test | ||
def test_hybrid_parquet_read_round_trip(spark_tmp_path, parquet_gens, gen_rows): | ||
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] | ||
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_cpu_session( | ||
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path), | ||
conf=rebase_write_corrected_conf) | ||
|
||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark: spark.read.parquet(data_path), | ||
conf={ | ||
'spark.sql.sources.useV1SourceList': 'parquet', | ||
'spark.rapids.sql.parquet.useHybridReader': 'true', | ||
}) | ||
|
||
|
||
# Creating scenarios in which CoalesceConverter will coalesce several input batches by adjusting | ||
# reader_batch_size and coalesced_batch_size, tests if the CoalesceConverter functions correctly | ||
# when coalescing is needed. | ||
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") | ||
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") | ||
@pytest.mark.parametrize('reader_batch_size', [512, 1024, 2048], ids=idfn) | ||
@pytest.mark.parametrize('coalesced_batch_size', [1 << 25, 1 << 27], ids=idfn) | ||
@pytest.mark.parametrize('gen_rows', [8192, 10000], ids=idfn) | ||
@hybrid_test | ||
def test_hybrid_parquet_read_round_trip_multiple_batches(spark_tmp_path, | ||
reader_batch_size, | ||
coalesced_batch_size, | ||
gen_rows): | ||
gens = [] | ||
for g in parquet_gens_list: | ||
gens.extend(g) | ||
|
||
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)] | ||
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_cpu_session( | ||
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path), | ||
conf=rebase_write_corrected_conf) | ||
|
||
assert_gpu_and_cpu_are_equal_collect( | ||
lambda spark: spark.read.parquet(data_path), | ||
conf={ | ||
'spark.sql.sources.useV1SourceList': 'parquet', | ||
'spark.rapids.sql.parquet.useHybridReader': 'true', | ||
'spark.gluten.sql.columnar.maxBatchSize': reader_batch_size, | ||
'spark.rapids.sql.batchSizeBytes': coalesced_batch_size, | ||
}) | ||
|
||
|
||
# HybridScan shall NOT be enabled over unsupported data types. Instead, fallbacks to GpuScan. | ||
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") | ||
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") | ||
@pytest.mark.parametrize('parquet_gens', parquet_gens_fallback_lists, ids=idfn) | ||
@hybrid_test | ||
def test_hybrid_parquet_read_fallback_to_gpu(spark_tmp_path, parquet_gens): | ||
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] | ||
data_path = spark_tmp_path + '/PARQUET_DATA' | ||
with_cpu_session( | ||
lambda spark: gen_df(spark, gen_list, length=512).write.parquet(data_path), | ||
conf=rebase_write_corrected_conf) | ||
|
||
assert_cpu_and_gpu_are_equal_collect_with_capture( | ||
lambda spark: spark.read.parquet(data_path), | ||
exist_classes='GpuFileSourceScanExec', | ||
non_exist_classes='HybridFileSourceScanExec', | ||
conf={ | ||
'spark.sql.sources.useV1SourceList': 'parquet', | ||
'spark.rapids.sql.parquet.useHybridReader': 'true', | ||
}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
#!/bin/bash | ||
# | ||
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
set -ex | ||
|
||
. jenkins/version-def.sh | ||
|
||
test_hybrid_feature() { | ||
echo "Run hybrid execution test cases..." | ||
|
||
# parameters for Hybrid featrue | ||
spark_prefix="${SPARK_VER:0:3}" # get prefix from SPARK_VER, e.g.: 3.2, 3.3 ... 3.5 | ||
GLUTEN_BUNDLE_JAR="gluten-velox-bundle-spark${spark_prefix}_2.12-ubuntu_${GLUTEN_FOR_OS}_x86_64-${GLUTEN_VERSION}.jar" | ||
HYBRID_JAR="rapids-4-spark-hybrid_2.12-${PROJECT_TEST_VER}.jar" | ||
GLUTEN_THIRD_PARTY_JAR="gluten-thirdparty-lib-${GLUTEN_VERSION}-ubuntu-${GLUTEN_FOR_OS}-x86_64.jar" | ||
|
||
# download Gluten, Hybrid jars | ||
mvn -B dependency:get -DgroupId=com.nvidia \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: avoid using mvn dependency:get if possible, the IT run does not require mvn dep. |
||
-DartifactId=gluten-velox-bundle \ | ||
-Dversion=${GLUTEN_VERSION} \ | ||
-Dpackaging=jar \ | ||
-Dclassifier=spark${spark_prefix}_2.12-ubuntu_${GLUTEN_FOR_OS} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo ?
|
||
-Dtransitive=false \ | ||
-Ddest=/tmp/$GLUTEN_BUNDLE_JAR | ||
mvn -B dependency:get -DgroupId=com.nvidia \ | ||
-DartifactId=rapids-4-spark-hybrid_2.12 \ | ||
-Dversion=${PROJECT_TEST_VER} \ | ||
-Dpackaging=jar \ | ||
-Dtransitive=false \ | ||
-Ddest=/tmp/$HYBRID_JAR | ||
wget -O /tmp/${GLUTEN_THIRD_PARTY_JAR} ${MVN_URM_MIRROR}/com/nvidia/gluten-thirdparty-lib/${GLUTEN_VERSION}/${GLUTEN_THIRD_PARTY_JAR} | ||
|
||
# run Hybrid Python tests | ||
LOAD_HYBRID_BACKEND=1 \ | ||
HYBRID_BACKEND_JARS=/tmp/${HYBRID_JAR},/tmp/${GLUTEN_BUNDLE_JAR},/tmp/${GLUTEN_THIRD_PARTY_JAR} \ | ||
./integration_tests/run_pyspark_from_build.sh -m hybrid_test | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
#!/bin/bash | ||
# | ||
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved. | ||
# Copyright (c) 2020-2025, NVIDIA CORPORATION. All rights reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
|
@@ -109,6 +109,10 @@ mvn_verify() { | |
do | ||
TZ=$tz ./integration_tests/run_pyspark_from_build.sh -m tz_sensitive_test | ||
done | ||
|
||
# test Hybrid feature | ||
source "${WORKSPACE}/jenkins/hybrid_execution.sh" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how long will this test take? we may need some duration to determine if we should put it into in ci1 or ci2 stage for balancing the workloads |
||
test_hybrid_feature | ||
} | ||
|
||
rapids_shuffle_smoke_test() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,9 @@ for VAR in $OVERWRITE_PARAMS; do | |
done | ||
IFS=$PRE_IFS | ||
|
||
# configs for Hybrid feature | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better use default value patten instead of hardcode, then we can overwrite these vars outside without changing this script
|
||
GLUTEN_VERSION=1.2.0 | ||
GLUTEN_FOR_OS=20.04 | ||
|
||
CUDA_CLASSIFIER=${CUDA_CLASSIFIER:-"cuda11"} | ||
CLASSIFIER=${CLASSIFIER:-"$CUDA_CLASSIFIER"} # default as CUDA_CLASSIFIER for compatibility | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to make this script for preparing artifacts only and the actual caller place to run test command directly,
or at least separate the func of artifacts preparation from the actual test call func.
sth like