Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hybrid (CPU) scan for Parquet read [databricks] #11720

Open
wants to merge 34 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
65de010
Merge C2C code to main
Nov 12, 2024
e6cede2
Update the dependencies in pom.xml
Nov 13, 2024
1e4fc13
revert BD velox hdfs code
Nov 15, 2024
46e19df
fit codes into the new HybridScan hierarchy
sperlingxx Nov 18, 2024
4f2a4d6
refine QueryPlan, RapidsMeta and test suites for HybridScan
sperlingxx Nov 20, 2024
4d52f90
Integrate Hybrid plugin; update IT
Nov 25, 2024
c82eb29
Make Hybrid jar provoided scope; Update shim to only applicable for S…
Dec 4, 2024
65b585a
Fix comments
Dec 4, 2024
d214739
Code comment update, a minor change
Dec 5, 2024
e0f1e3b
Fix shim logic
Dec 5, 2024
6331ab8
Fix shims: build for all shims, but report error when Spark is CDH or…
Dec 6, 2024
b1b8481
Remove useless shim code
Dec 9, 2024
dbae63f
IT: add tests for decimal types
Dec 9, 2024
5e972d6
Add checks for Java/Scala version, only supports Java 1.8 and Scala 2.12
Dec 9, 2024
c6fa249
Check datasource is v1
Dec 10, 2024
e95e7cc
Update test case: skip if runtime Spark is Databricks
Dec 11, 2024
092dab8
Update Hybrid Config doc: not all Spark versions are fully tested, on…
Dec 11, 2024
519f33c
Merge branch 'branch-25.02' into merge-c2c
Dec 11, 2024
b3b6f80
some refinement
sperlingxx Dec 13, 2024
f0921a4
fix tests && unsupported types
sperlingxx Dec 17, 2024
dd5d8f9
Add doc for Hybrid execution feature
Dec 23, 2024
6149589
Update doc
Dec 24, 2024
114b93a
Check Hybrid jar in executor
Dec 24, 2024
36d3cdf
Update hybrid-execution.md
winningsix Dec 25, 2024
5bfd763
Remove check for Java versions
Dec 25, 2024
275fa3d
Fix scala 2.13 check failure
Dec 25, 2024
cbb5609
Fix for Scala 2.13 building
Dec 30, 2024
c1df7c4
Fix: specify default value for loadBackend to avoid exception
Jan 13, 2025
99602c5
Update Copyright to add new year 2025
Jan 13, 2025
3d3b172
Fix Databricks building
Jan 14, 2025
d718d8c
Minor format change
Jan 14, 2025
4e79c2b
Merge branch 'branch-25.02' into merge-c2c
Jan 14, 2025
aae45b9
Add shim 354 for Hybrid feature
Jan 14, 2025
4fd5fdb
Fix Databricks building
Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,22 @@ EOF
fi
export PYSP_TEST_spark_rapids_memory_gpu_allocSize=${PYSP_TEST_spark_rapids_memory_gpu_allocSize:-'1536m'}

# Turns on $LOAD_HYBRID_BACKEND and setup the filepath of hybrid backend jars, to activate the
# hybrid backend while running subsequent integration tests.
if [[ "$LOAD_HYBRID_BACKEND" -eq 1 ]]; then
if [ -z "${HYBRID_BACKEND_JARS}" ]; then
echo "Error: Environment HYBRID_BACKEND_JARS is not set."
exit 1
fi
export PYSP_TEST_spark_jars="${PYSP_TEST_spark_jars},${HYBRID_BACKEND_JARS//:/,}"
export PYSP_TEST_spark_rapids_sql_parquet_useHybridReader=true
export PYSP_TEST_spark_rapids_sql_hybrid_loadBackend=true
export PYSP_TEST_spark_memory_offHeap_enabled=true
export PYSP_TEST_spark_memory_offHeap_size=512M
export PYSP_TEST_spark_rapids_sql_hybrid_load=true
export PYSP_TEST_spark_gluten_loadLibFromJar=true
fi

SPARK_SHELL_SMOKE_TEST="${SPARK_SHELL_SMOKE_TEST:-0}"
if [[ "${SPARK_SHELL_SMOKE_TEST}" != "0" ]]; then
echo "Running spark-shell smoke test..."
Expand Down
1 change: 1 addition & 0 deletions integration_tests/src/main/python/marks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@
pyarrow_test = pytest.mark.pyarrow_test
datagen_overrides = pytest.mark.datagen_overrides
tz_sensitive_test = pytest.mark.tz_sensitive_test
hybrid_test = pytest.mark.hybrid_test
90 changes: 90 additions & 0 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,3 +1659,93 @@ def setup_table(spark):
with_cpu_session(lambda spark: setup_table(spark))
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).select("p"),
conf={"spark.rapids.sql.columnSizeBytes": "100"})

"""
Hybrid Scan:
1. DecimalType is NOT fully supported
revans2 marked this conversation as resolved.
Show resolved Hide resolved
2. TimestampType can NOT be the KeyType of MapType
3. NestedMap is disabled because it may produce incorrect result (usually occurring when table is very small)
"""
hybrid_gens = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, date_gen,
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)),
decimal_gen_32bit, decimal_gen_64bit, decimal_gen_128bit,
ArrayGen(byte_gen),
ArrayGen(long_gen), ArrayGen(string_gen), ArrayGen(date_gen),
ArrayGen(TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))),
ArrayGen(ArrayGen(byte_gen)),
StructGen([['child0', ArrayGen(byte_gen)], ['child1', byte_gen], ['child2', float_gen],
['child3', decimal_gen_64bit]]),
ArrayGen(StructGen([['child0', string_gen], ['child1', double_gen], ['child2', int_gen]]))
],
[ArrayGen(decimal_gen_32bit)],
[ArrayGen(decimal_gen_64bit)],
# [ArrayGen(decimal_gen_128bit) ], TODO: has bug
[
MapGen(DecimalGen(precision=7, scale=3, nullable=False), DecimalGen(precision=7, scale=3)),
MapGen(DecimalGen(precision=12, scale=2, nullable=False), DecimalGen(precision=12, scale=2)),
# MapGen(DecimalGen(precision=20, scale=2, nullable=False), DecimalGen(precision=20, scale=2)), TODO: has bug
],
[MapGen(f(nullable=False), f()) for f in [
BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen]
],
[simple_string_to_string_map_gen,
MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen),
max_length=10),
MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10),
# TODO: It seems that Hybrid Parquet Scan (underlying) can NOT handle nested Map correctly
# MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)
],
]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some tests to validate that predicate push down and filtering is working correctly? It would be nice to have

  1. simple filters
  2. complex filters that are not supported by normal parquet predicate push down. (like the ors at the top level instead of ands)
  3. filters that have operators in them that velox does not support, but spark rapids does.

Copy link
Collaborator Author

@res-life res-life Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed internally before, the decision is putting into a follow-up PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up issue filed: #11892

@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently")
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('parquet_gens', hybrid_gens, ids=idfn)
@pytest.mark.parametrize('gen_rows', [20, 100, 512, 1024, 4096], ids=idfn)
@hybrid_test
def test_parquet_read_round_trip_hybrid(spark_tmp_path, parquet_gens, gen_rows):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
})

# Creating scenarios in which CoalesceConverter will coalesce several input batches by adjusting
# reader_batch_size and coalesced_batch_size, tests if the CoalesceConverter functions correctly
# when coalescing is needed.
@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently")
@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests")
@pytest.mark.parametrize('reader_batch_size', [512, 1024, 2048], ids=idfn)
@pytest.mark.parametrize('coalesced_batch_size', [1 << 25, 1 << 27], ids=idfn)
@pytest.mark.parametrize('gen_rows', [8192, 10000], ids=idfn)
@hybrid_test
def test_parquet_read_round_trip_hybrid_multiple_batches(spark_tmp_path,
reader_batch_size,
coalesced_batch_size,
gen_rows):
gens = []
for g in hybrid_gens:
gens.extend(g)

gen_list = [('_c' + str(i), gen) for i, gen in enumerate(gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: gen_df(spark, gen_list, length=gen_rows).write.parquet(data_path),
conf=rebase_write_corrected_conf)

assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
conf={
'spark.sql.sources.useV1SourceList': 'parquet',
'spark.rapids.sql.parquet.useHybridReader': 'true',
'spark.gluten.sql.columnar.maxBatchSize': reader_batch_size,
'spark.rapids.sql.batchSizeBytes': coalesced_batch_size,
})
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,6 @@ def is_hive_available():
if is_at_least_precommit_run():
return True
return _spark.conf.get("spark.sql.catalogImplementation") == "hive"

def is_hybrid_backend_loaded():
return _spark.conf.get("spark.rapids.sql.hybrid.loadBackend") == "true"
7 changes: 7 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-hybrid_${scala.binary.version}</artifactId>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<!-- #if scala-2.13 --><!--
<profiles>
Expand Down
21 changes: 17 additions & 4 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Plugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -343,16 +343,29 @@ object RapidsPluginUtils extends Logging {
}
}

/**
* Find spark-rapids-extra-plugins files, and create plugin instances by reflection.
* Note: If Hybrid jar is not in the classpath, then will not create Hybrid plugin.
* @return plugin instances defined in spark-rapids-extra-plugins files.
*/
private def getExtraPlugins: Seq[SparkPlugin] = {
val resourceName = "spark-rapids-extra-plugins"
val classLoader = RapidsPluginUtils.getClass.getClassLoader
val resource = classLoader.getResourceAsStream(resourceName)
if (resource == null) {
val resourceUrls = classLoader.getResources(resourceName)
val resourceUrlArray = resourceUrls.asScala.toArray

if (resourceUrlArray.isEmpty) {
logDebug(s"Could not find file $resourceName in the classpath, not loading extra plugins")
Seq.empty
} else {
val pluginClasses = scala.io.Source.fromInputStream(resource).getLines().toSeq
loadExtensions(classOf[SparkPlugin], pluginClasses)
val plugins = scala.collection.mutable.ListBuffer[SparkPlugin]()
for (resourceUrl <- resourceUrlArray) {
val source = scala.io.Source.fromURL(resourceUrl)
val pluginClasses = source.getLines().toList
source.close()
plugins ++= loadExtensions(classOf[SparkPlugin], pluginClasses)
}
plugins
}
}

Expand Down
21 changes: 21 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,23 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(false)

val HYBRID_PARQUET_READER = conf("spark.rapids.sql.parquet.useHybridReader")
.doc("Use HybridScan to read Parquet data using CPUs. The underlying implementation " +
"leverages both Gluten and Velox. Supports Spark 3.2.2, 3.3.1, 3.4.2, and 3.5.1 " +
"as Gluten does, also supports other versions but not fully tested.")
.internal()
.booleanConf
.createWithDefault(false)

// This config name is the same as HybridPluginWrapper in Hybrid jar,
// can not refer to Hybrid jar because of the jar is optional.
val LOAD_HYBRID_BACKEND = conf("spark.rapids.sql.hybrid.loadBackend")
.doc("Load hybrid backend as an extra plugin of spark-rapids during launch time")
.internal()
.startupOnly()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
.booleanConf
.createWithDefault(false)

val HASH_AGG_REPLACE_MODE = conf("spark.rapids.sql.hashAgg.replaceMode")
.doc("Only when hash aggregate exec has these modes (\"all\" by default): " +
"\"all\" (try to replace all aggregates, default), " +
Expand Down Expand Up @@ -2895,6 +2912,10 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val avroDebugDumpAlways: Boolean = get(AVRO_DEBUG_DUMP_ALWAYS)

lazy val useHybridParquetReader: Boolean = get(HYBRID_PARQUET_READER)

lazy val loadHybridBackend: Boolean = get(LOAD_HYBRID_BACKEND)
GaryShen2008 marked this conversation as resolved.
Show resolved Hide resolved

lazy val hashAggReplaceMode: String = get(HASH_AGG_REPLACE_MODE)

lazy val partialMergeDistinctEnabled: Boolean = get(PARTIAL_MERGE_DISTINCT_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rapids.hybrid

import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.{AcquireFailed, GpuColumnVector, GpuMetric, GpuSemaphore, NvtxWithMetrics}
import com.nvidia.spark.rapids.Arm._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.hybrid.{CoalesceBatchConverter => NativeConverter}
import com.nvidia.spark.rapids.hybrid.RapidsHostColumn

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* The Iterator wrapper of the underlying NativeConverter which produces the coalesced Batch of
* HostColumnVectors. The iterator produces RapidsHostColumn instead of HostColumnVector for
* carrying metadata about [Pinned|Pageable]MemCpy which are displayed as Spark SQL Metrics.
*/
class CoalesceConvertIterator(cpuScanIter: Iterator[ColumnarBatch],
targetBatchSizeInBytes: Long,
schema: StructType,
metrics: Map[String, GpuMetric])
extends Iterator[Array[RapidsHostColumn]] with Logging {

private var converterImpl: NativeConverter = _

private var srcExhausted = false

private val converterMetrics = Map(
"C2COutputSize" -> GpuMetric.unwrap(metrics("C2COutputSize")))

override def hasNext(): Boolean = {
// isDeckFilled means if there is unconverted source data remained on the deck.
// hasProceedingBuilders means if there exists working target vectors not being flushed yet.
val selfHoldData = Option(converterImpl).exists { c =>
c.isDeckFilled || c.hasProceedingBuilders
}
// Check the srcExhausted at first, so as to minimize the potential cost of unnecessary call of
// prev.hasNext
lazy val upstreamHoldData = !srcExhausted && cpuScanIter.hasNext
// Either converter holds data or upstreaming iterator holds data.
if (selfHoldData || upstreamHoldData) {
return true
}

if (!srcExhausted) {
srcExhausted = true
}
// Close the native Converter and dump column-level metrics for performance inspection.
Option(converterImpl).foreach { c =>
// VeloxBatchConverter collects the eclipsedTime of C2C_Conversion by itself.
// Here we fetch the final value before closing it.
metrics("C2CTime") += c.eclipsedNanoSecond
// release the native instance when upstreaming iterator has been exhausted
val detailedMetrics = c.close()
val tID = TaskContext.get().taskAttemptId()
logInfo(s"task[$tID] CoalesceNativeConverter finished:\n$detailedMetrics")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does this need to be at the info level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

converterImpl = null
}
false
}

override def next(): Array[RapidsHostColumn] = {
require(!srcExhausted, "Please call hasNext in previous to ensure there are remaining data")

// Initialize the nativeConverter with the first input batch
if (converterImpl == null) {
converterImpl = NativeConverter(
cpuScanIter.next(),
targetBatchSizeInBytes,
schema,
converterMetrics
)
}

// Keeps consuming input batches of cpuScanIter until targetVectors reaches `targetBatchSize`
// or cpuScanIter being exhausted.
while (true) {
val needFlush = if (cpuScanIter.hasNext) {
metrics("CpuReaderBatches") += 1
// The only condition leading to a nonEmpty deck is targetVectors are unset after
// the previous flushing
if (converterImpl.isDeckFilled) {
converterImpl.setupTargetVectors()
}
// tryAppendBatch, if failed which indicates the remaining space of targetVectors is NOT
// enough the current input batch, then the batch will be placed on the deck and trigger
// the flush of working targetVectors
!converterImpl.tryAppendBatch(cpuScanIter.next())
} else {
// If cpuScanIter is exhausted, then flushes targetVectors as the last output item.
srcExhausted = true
true
}
if (needFlush) {
metrics("CoalescedBatches") += 1
val rapidsHostBatch = converterImpl.flush()
// It is essential to check and tidy up the deck right after flushing. Because if
// the next call of cpuScanIter.hasNext will release the batch which the deck holds
// its reference.
if (converterImpl.isDeckFilled) {
converterImpl.setupTargetVectors()
}
return rapidsHostBatch
}
}

throw new RuntimeException("should NOT reach this line")
}
}

object CoalesceConvertIterator extends Logging {

def hostToDevice(hostIter: Iterator[Array[RapidsHostColumn]],
outputAttr: Seq[Attribute],
metrics: Map[String, GpuMetric]): Iterator[ColumnarBatch] = {
val dataTypes = outputAttr.map(_.dataType).toArray

hostIter.map { hostVectors =>
Option(TaskContext.get()).foreach { ctx =>
GpuSemaphore.tryAcquire(ctx) match {
revans2 marked this conversation as resolved.
Show resolved Hide resolved
case AcquireFailed(_) =>
withResource(new NvtxWithMetrics("gpuAcquireC2C", NvtxColor.GREEN,
metrics("GpuAcquireTime"))) { _ =>
GpuSemaphore.acquireIfNecessary(ctx)
}
case _ =>
}
}

val deviceVectors: Array[ColumnVector] = hostVectors.zip(dataTypes).safeMap {
case (RapidsHostColumn(hcv, isPinned, totalBytes), dt) =>
val nvtxMetric = if (isPinned) {
metrics("PinnedH2DSize") += totalBytes
new NvtxWithMetrics("pinnedH2D", NvtxColor.DARK_GREEN, metrics("PinnedH2DTime"))
} else {
metrics("PageableH2DSize") += totalBytes
new NvtxWithMetrics("PageableH2D", NvtxColor.GREEN, metrics("PageableH2DTime"))
}
withResource(hcv) { _ =>
withResource(nvtxMetric) { _ =>
GpuColumnVector.from(hcv.copyToDevice(), dt)
}
}
}

new ColumnarBatch(deviceVectors, hostVectors.head.vector.getRowCount.toInt)
}
}

}
Loading
Loading