Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hybrid (CPU) scan for Parquet read [databricks] #11720

Open
wants to merge 34 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
65de010
Merge C2C code to main
Nov 12, 2024
e6cede2
Update the dependencies in pom.xml
Nov 13, 2024
1e4fc13
revert BD velox hdfs code
Nov 15, 2024
46e19df
fit codes into the new HybridScan hierarchy
sperlingxx Nov 18, 2024
4f2a4d6
refine QueryPlan, RapidsMeta and test suites for HybridScan
sperlingxx Nov 20, 2024
4d52f90
Integrate Hybrid plugin; update IT
Nov 25, 2024
c82eb29
Make Hybrid jar provoided scope; Update shim to only applicable for S…
Dec 4, 2024
65b585a
Fix comments
Dec 4, 2024
d214739
Code comment update, a minor change
Dec 5, 2024
e0f1e3b
Fix shim logic
Dec 5, 2024
6331ab8
Fix shims: build for all shims, but report error when Spark is CDH or…
Dec 6, 2024
b1b8481
Remove useless shim code
Dec 9, 2024
dbae63f
IT: add tests for decimal types
Dec 9, 2024
5e972d6
Add checks for Java/Scala version, only supports Java 1.8 and Scala 2.12
Dec 9, 2024
c6fa249
Check datasource is v1
Dec 10, 2024
e95e7cc
Update test case: skip if runtime Spark is Databricks
Dec 11, 2024
092dab8
Update Hybrid Config doc: not all Spark versions are fully tested, on…
Dec 11, 2024
519f33c
Merge branch 'branch-25.02' into merge-c2c
Dec 11, 2024
b3b6f80
some refinement
sperlingxx Dec 13, 2024
f0921a4
fix tests && unsupported types
sperlingxx Dec 17, 2024
dd5d8f9
Add doc for Hybrid execution feature
Dec 23, 2024
6149589
Update doc
Dec 24, 2024
114b93a
Check Hybrid jar in executor
Dec 24, 2024
36d3cdf
Update hybrid-execution.md
winningsix Dec 25, 2024
5bfd763
Remove check for Java versions
Dec 25, 2024
275fa3d
Fix scala 2.13 check failure
Dec 25, 2024
cbb5609
Fix for Scala 2.13 building
Dec 30, 2024
c1df7c4
Fix: specify default value for loadBackend to avoid exception
Jan 13, 2025
99602c5
Update Copyright to add new year 2025
Jan 13, 2025
3d3b172
Fix Databricks building
Jan 14, 2025
d718d8c
Minor format change
Jan 14, 2025
4e79c2b
Merge branch 'branch-25.02' into merge-c2c
Jan 14, 2025
aae45b9
Add shim 354 for Hybrid feature
Jan 14, 2025
4fd5fdb
Fix Databricks building
Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,22 @@ EOF
fi
export PYSP_TEST_spark_rapids_memory_gpu_allocSize=${PYSP_TEST_spark_rapids_memory_gpu_allocSize:-'1536m'}

# Turns on $LOAD_HYBRID_BACKEND and setup the filepath of hybrid backend jars, to activate the
# hybrid backend while running subsequent integration tests.
if [[ "$LOAD_HYBRID_BACKEND" -eq 1 ]]; then
if [ -z "${HYBRID_BACKEND_JARS}" ]; then
echo "Error: Environment HYBRID_BACKEND_JARS is not set."
exit 1
fi
export PYSP_TEST_spark_jars="${PYSP_TEST_spark_jars},${HYBRID_BACKEND_JARS//:/,}"
export PYSP_TEST_spark_rapids_sql_parquet_useHybridReader=true
export PYSP_TEST_spark_rapids_sql_hybrid_loadBackend=true
export PYSP_TEST_spark_memory_offHeap_enabled=true
export PYSP_TEST_spark_memory_offHeap_size=512M
export PYSP_TEST_spark_rapids_sql_hybrid_load=true
export PYSP_TEST_spark_gluten_loadLibFromJar=true
fi

SPARK_SHELL_SMOKE_TEST="${SPARK_SHELL_SMOKE_TEST:-0}"
if [[ "${SPARK_SHELL_SMOKE_TEST}" != "0" ]]; then
echo "Running spark-shell smoke test..."
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@
pyarrow_test = pytest.mark.pyarrow_test
datagen_overrides = pytest.mark.datagen_overrides
tz_sensitive_test = pytest.mark.tz_sensitive_test
hybrid_test = pytest.mark.hybrid_test
79 changes: 79 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1650,3 +1650,82 @@ def setup_table(spark):
with_cpu_session(lambda spark: setup_table(spark))
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).select("p"),
conf={"spark.rapids.sql.columnSizeBytes": "100"})

"""
VeloxScan:
1. DecimalType is NOT fully supported
revans2 marked this conversation as resolved.
Show resolved Hide resolved
2. TimestampType can NOT be the KeyType of MapType
3. NestedMap is disabled because it may produce incorrect result (usually occurring when table is very small)
"""
velox_gens = [
res-life marked this conversation as resolved.
Show resolved Hide resolved
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen,
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)),
ArrayGen(byte_gen),
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen),
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(byte_gen)),
StructGen([['child0', ArrayGen(byte_gen)], ['child1', byte_gen], ['child2', float_gen],
['child3', decimal_gen_64bit]]),
ArrayGen(StructGen([['child0', string_gen], ['child1', double_gen], ['child2', int_gen]]))
],
[MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen]
],
[simple_string_to_string_map_gen,
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen),
max_length=10),
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10),
# TODO: It seems that Velox Parquet Scan can NOT handle nested Map correctly
# MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)
],
]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests to validate that predicate push down and filtering is working correctly? It would be nice to have

  1. simple filters
  2. complex filters that are not supported by normal parquet predicate push down. (like the ors at the top level instead of ands)
  3. filters that have operators in them that velox does not support, but spark rapids does.

Copy link
Collaborator Author

@res-life res-life Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed internally before, the decision is putting into a follow-up PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up issue filed: #11892

@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('parquet_gens', velox_gens, ids=idfn)
@pytest.mark.parametrize('gen_rows', [20, 100, 512, 1024, 4096], ids=idfn)
@hybrid_test
def test_parquet_read_round_trip_hybrid(spark_tmp_path, parquet_gens, gen_rows):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
})

# Creating scenarios in which CoalesceConverter will coalesce several input batches by adjusting
# reader_batch_size and coalesced_batch_size, tests if the CoalesceConverter functions correctly
# when coalescing is needed.
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('reader_batch_size', [512, 1024, 2048], ids=idfn)
@pytest.mark.parametrize('coalesced_batch_size', [1 << 25, 1 << 27], ids=idfn)
@pytest.mark.parametrize('gen_rows', [8192, 10000], ids=idfn)
@hybrid_test
def test_parquet_read_round_trip_hybrid_multiple_batches(spark_tmp_path,
reader_batch_size,
coalesced_batch_size,
gen_rows):
gens = []
for g in velox_gens:
gens.extend(g)

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
'spark.gluten.sql.columnar.maxBatchSize': reader_batch_size,
'spark.rapids.sql.batchSizeBytes': coalesced_batch_size,
})
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,6 @@ def is_hive_available():
if is_at_least_precommit_run():
return True
return _spark.conf.get("spark.sql.catalogImplementation") == "hive"

def is_hybrid_backend_loaded():
return _spark.conf.get("spark.rapids.sql.hybrid.load") == "true"
6 changes: 6 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
<version>${project.version}</version>
</dependency>
</dependencies>
<!-- #if scala-2.13 --><!--
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2723,6 +2723,12 @@ case class ParquetTableReader(
}

override def close(): Unit = {
debugDumpPrefix.foreach { prefix =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for $splitsString to $p")
}
}
reader.close()
buffer.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

private class GpuRowToColumnConverter(schema: StructType) extends Serializable {
class GpuRowToColumnConverter(schema: StructType) extends Serializable {
private val converters = schema.fields.map {
f => GpuRowToColumnConverter.getConverterForType(f.dataType, f.nullable)
}
Expand Down Expand Up @@ -594,7 +594,8 @@ class RowToColumnarIterator(
numOutputRows: GpuMetric = NoopMetric,
numOutputBatches: GpuMetric = NoopMetric,
streamTime: GpuMetric = NoopMetric,
opTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] {
opTime: GpuMetric = NoopMetric,
acquireGpuTime: GpuMetric = NoopMetric) extends Iterator[ColumnarBatch] {

private val targetSizeBytes = localGoal.targetSizeBytes
private var targetRows = 0
Expand Down Expand Up @@ -650,7 +651,11 @@ class RowToColumnarIterator(
// note that TaskContext.get() can return null during unit testing so we wrap it in an
// option here
Option(TaskContext.get())
.foreach(ctx => GpuSemaphore.acquireIfNecessary(ctx))
.foreach { ctx =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
val acquireGpuStart = System.nanoTime()
GpuSemaphore.acquireIfNecessary(ctx)
acquireGpuTime += System.nanoTime() - acquireGpuStart
}

val ret = withResource(new NvtxWithMetrics("RowToColumnar", NvtxColor.GREEN,
opTime)) { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.nvidia.spark.DFUDFPlugin
import com.nvidia.spark.rapids.RapidsConf.AllowMultipleJars
import com.nvidia.spark.rapids.RapidsPluginUtils.buildInfoEvent
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg}
import com.nvidia.spark.rapids.hybrid.HybridPluginWrapper
revans2 marked this conversation as resolved.
Show resolved Hide resolved
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -98,7 +99,8 @@ object RapidsPluginUtils extends Logging {
s"private revision ${privateRev}")
}

val extraPlugins = getExtraPlugins
val extraPlugins = getExtraPlugins ++
Seq(new HybridPluginWrapper().asInstanceOf[SparkPlugin])

def logPluginMode(conf: RapidsConf): Unit = {
if (conf.isSqlEnabled && conf.isSqlExecuteOnGPU) {
Expand Down
18 changes: 18 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.nvidia.spark.rapids.lore.{LoreId, OutputLoreId}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rapids.hybrid.HybridBackend
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.RapidsPrivateUtil
Expand Down Expand Up @@ -1688,6 +1689,19 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val HYBRID_PARQUET_READER = conf("spark.rapids.sql.parquet.useHybridReader")
.doc("Use HybridScan to read Parquet data via CPUs")
res-life marked this conversation as resolved.
Show resolved Hide resolved
.internal()
.booleanConf
.createWithDefault(false)

// spark.rapids.sql.hybrid.loadBackend defined at HybridPluginWrapper of spark-rapids-private
val LOAD_HYBRID_BACKEND = conf(HybridBackend.LOAD_BACKEND_KEY)
.doc("Load hybrid backend as an extra plugin of spark-rapids during launch time")
.startupOnly()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
.booleanConf
.createWithDefault(false)

val HASH_AGG_REPLACE_MODE = conf("spark.rapids.sql.hashAgg.replaceMode")
.doc("Only when hash aggregate exec has these modes (\"all\" by default): " +
"\"all\" (try to replace all aggregates, default), " +
Expand Down Expand Up @@ -2829,6 +2843,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val avroDebugDumpAlways: Boolean = get(AVRO_DEBUG_DUMP_ALWAYS)

lazy val useHybridParquetReader: Boolean = get(HYBRID_PARQUET_READER)

lazy val loadHybridBackend: Boolean = get(LOAD_HYBRID_BACKEND)
GaryShen2008 marked this conversation as resolved.
Show resolved Hide resolved

lazy val hashAggReplaceMode: String = get(HASH_AGG_REPLACE_MODE)

lazy val partialMergeDistinctEnabled: Boolean = get(PARTIAL_MERGE_DISTINCT_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.hybrid

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.{AcquireFailed, GpuColumnVector, GpuMetric, GpuSemaphore, NvtxWithMetrics}
import com.nvidia.spark.rapids.Arm._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.hybrid.{CoalesceBatchConverter => NativeConverter}
import com.nvidia.spark.rapids.hybrid.RapidsHostColumn

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

class CoalesceConvertIterator(veloxIter: Iterator[ColumnarBatch],
targetBatchSizeInBytes: Int,
schema: StructType,
metrics: Map[String, GpuMetric])
extends Iterator[Array[RapidsHostColumn]] with Logging {

private var converterImpl: Option[NativeConverter] = None

private var srcExhausted = false

private val converterMetrics = Map(
"C2COutputSize" -> GpuMetric.unwrap(metrics("C2COutputSize")))

override def hasNext(): Boolean = {
// either converter holds data or upstreaming iterator holds data
val ret = withResource(new NvtxWithMetrics("VeloxC2CHasNext", NvtxColor.WHITE,
metrics("C2CStreamTime"))) { _ =>
converterImpl.exists(c => c.isDeckFilled || c.hasProceedingBuilders) ||
(!srcExhausted && veloxIter.hasNext)
}
if (!ret) {
if (!srcExhausted) {
srcExhausted = true
}
converterImpl.foreach { c =>
// VeloxBatchConverter collects the eclipsedTime of C2C_Conversion by itself.
// Here we fetch the final value before closing it.
metrics("C2CTime") += c.eclipsedNanoSecond
// release the native instance when upstreaming iterator has been exhausted
val detailedMetrics = c.close()
val tID = TaskContext.get().taskAttemptId()
logError(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics")
revans2 marked this conversation as resolved.
Show resolved Hide resolved
converterImpl = None
}
}
ret
}

override def next(): Array[RapidsHostColumn] = {
val ntvx = new NvtxWithMetrics("VeloxC2CNext", NvtxColor.YELLOW, metrics("C2CStreamTime"))
withResource(ntvx) { _ =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
while (true) {
converterImpl.foreach { impl =>
val needFlush = if (veloxIter.hasNext) {
// The only condition leading to a nonEmpty deck is targetBuffers are unset after
// the previous flushing
if (impl.isDeckFilled) {
impl.setupTargetVectors()
}
// tryAppendBatch, if failed, the batch will be placed on the deck
metrics("CpuReaderBatches") += 1
!impl.tryAppendBatch(veloxIter.next())
} else {
srcExhausted = true
true
}
if (needFlush) {
metrics("CoalescedBatches") += 1
val rapidsHostBatch = impl.flush()
// It is essential to check and tidy up the deck right after flushing. Because if
// the next call of veloxIter.hasNext will release the batch which the deck holds
// its reference.
if (impl.isDeckFilled) {
impl.setupTargetVectors()
}
return rapidsHostBatch
}
}
if (converterImpl.isEmpty) {
val converter = NativeConverter(
veloxIter.next(),
targetBatchSizeInBytes, schema, converterMetrics
)
converterImpl = Some(converter)
}
}

throw new RuntimeException("should NOT reach this line")
}
}

}

object CoalesceConvertIterator extends Logging {

def hostToDevice(hostIter: Iterator[Array[RapidsHostColumn]],
outputAttr: Seq[Attribute],
metrics: Map[String, GpuMetric]): Iterator[ColumnarBatch] = {
val dataTypes = outputAttr.map(_.dataType).toArray

hostIter.map { hostVectors =>
Option(TaskContext.get()).foreach { ctx =>
GpuSemaphore.tryAcquire(ctx) match {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
case AcquireFailed(_) =>
withResource(new NvtxWithMetrics("gpuAcquireC2C", NvtxColor.GREEN,
metrics("GpuAcquireTime"))) { _ =>
GpuSemaphore.acquireIfNecessary(ctx)
}
case _ =>
}
}

val deviceVectors: Array[ColumnVector] = hostVectors.zip(dataTypes).safeMap {
case (RapidsHostColumn(hcv, isPinned, totalBytes), dt) =>
val nvtxMetric = if (isPinned) {
metrics("PinnedH2DSize") += totalBytes
new NvtxWithMetrics("pinnedH2D", NvtxColor.DARK_GREEN, metrics("PinnedH2DTime"))
} else {
metrics("PageableH2DSize") += totalBytes
new NvtxWithMetrics("PageableH2D", NvtxColor.GREEN, metrics("PageableH2DTime"))
}
withResource(hcv) { _ =>
withResource(nvtxMetric) { _ =>
GpuColumnVector.from(hcv.copyToDevice(), dt)
}
}
}

new ColumnarBatch(deviceVectors, hostVectors.head.vector.getRowCount.toInt)
}
}

}
Loading
Loading