Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hybrid (CPU) scan for Parquet read [databricks] #11720

Merged
merged 34 commits into from
Jan 16, 2025
Merged
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
65de010
Merge C2C code to main
Nov 12, 2024
e6cede2
Update the dependencies in pom.xml
Nov 13, 2024
1e4fc13
revert BD velox hdfs code
Nov 15, 2024
46e19df
fit codes into the new HybridScan hierarchy
sperlingxx Nov 18, 2024
4f2a4d6
refine QueryPlan, RapidsMeta and test suites for HybridScan
sperlingxx Nov 20, 2024
4d52f90
Integrate Hybrid plugin; update IT
Nov 25, 2024
c82eb29
Make Hybrid jar provoided scope; Update shim to only applicable for S…
Dec 4, 2024
65b585a
Fix comments
Dec 4, 2024
d214739
Code comment update, a minor change
Dec 5, 2024
e0f1e3b
Fix shim logic
Dec 5, 2024
6331ab8
Fix shims: build for all shims, but report error when Spark is CDH or…
Dec 6, 2024
b1b8481
Remove useless shim code
Dec 9, 2024
dbae63f
IT: add tests for decimal types
Dec 9, 2024
5e972d6
Add checks for Java/Scala version, only supports Java 1.8 and Scala 2.12
Dec 9, 2024
c6fa249
Check datasource is v1
Dec 10, 2024
e95e7cc
Update test case: skip if runtime Spark is Databricks
Dec 11, 2024
092dab8
Update Hybrid Config doc: not all Spark versions are fully tested, on…
Dec 11, 2024
519f33c
Merge branch 'branch-25.02' into merge-c2c
Dec 11, 2024
b3b6f80
some refinement
sperlingxx Dec 13, 2024
f0921a4
fix tests && unsupported types
sperlingxx Dec 17, 2024
dd5d8f9
Add doc for Hybrid execution feature
Dec 23, 2024
6149589
Update doc
Dec 24, 2024
114b93a
Check Hybrid jar in executor
Dec 24, 2024
36d3cdf
Update hybrid-execution.md
winningsix Dec 25, 2024
5bfd763
Remove check for Java versions
Dec 25, 2024
275fa3d
Fix scala 2.13 check failure
Dec 25, 2024
cbb5609
Fix for Scala 2.13 building
Dec 30, 2024
c1df7c4
Fix: specify default value for loadBackend to avoid exception
Jan 13, 2025
99602c5
Update Copyright to add new year 2025
Jan 13, 2025
3d3b172
Fix Databricks building
Jan 14, 2025
d718d8c
Minor format change
Jan 14, 2025
4e79c2b
Merge branch 'branch-25.02' into merge-c2c
Jan 14, 2025
aae45b9
Add shim 354 for Hybrid feature
Jan 14, 2025
4fd5fdb
Fix Databricks building
Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert BD velox hdfs code
  • Loading branch information
Chong Gao committed Nov 25, 2024

Unverified

This user has not yet uploaded their public signing key.
commit 1e4fc13bdd6b3a7c06dc897ab7caf9c896588666
3 changes: 0 additions & 3 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
@@ -374,9 +374,6 @@ EOF
export PYSP_TEST_spark_memory_offHeap_size=512M
export PYSP_TEST_spark_gluten_loadLibFromJar=true
export PYSP_TEST_spark_rapids_sql_loadVelox=true
if [[ "$VELOX_HDFS_TEST" -eq 1 ]]; then
export PYSP_TEST_spark_rapids_sql_velox_useVeloxHDFS=true
fi
fi

SPARK_SHELL_SMOKE_TEST="${SPARK_SHELL_SMOKE_TEST:-0}"
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import java.net.URI
import java.nio.ByteBuffer
import java.nio.channels.SeekableByteChannel
import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Locale}
import java.util.concurrent._

@@ -38,6 +39,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.filecache.FileCache
import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter}
import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl}
import com.nvidia.spark.rapids.velox.VeloxBackendApis
import org.apache.commons.io.IOUtils
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
import org.apache.hadoop.conf.Configuration
@@ -57,7 +59,6 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rapids.velox.{FileCopyRange, VeloxHDFS}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -464,6 +465,7 @@ private case class GpuParquetFileFilterHandler(

private val PARQUET_ENCRYPTION_CONFS = Seq("parquet.encryption.kms.client.class",
"parquet.encryption.kms.client.class", "parquet.crypto.factory.class")
private val PARQUET_MAGIC_ENCRYPTED = "PARE".getBytes(StandardCharsets.US_ASCII)

private def isParquetTimeInInt96(parquetType: Type): Boolean = {
parquetType match {
@@ -535,12 +537,8 @@ private case class GpuParquetFileFilterHandler(
private def readFooterBuffer(
filePath: Path,
conf: Configuration): HostMemoryBuffer = {
PerfIO.readParquetFooterBuffer(
filePath, conf, GpuParquetUtils.verifyParquetMagic)
.getOrElse(
VeloxHDFS.readParquetFooterBuffer(filePath, conf)
.getOrElse(readFooterBufUsingHadoop(filePath, conf))
)
PerfIO.readParquetFooterBuffer(filePath, conf, verifyParquetMagic)
.getOrElse(readFooterBufUsingHadoop(filePath, conf))
}

private def readFooterBufUsingHadoop(filePath: Path, conf: Configuration): HostMemoryBuffer = {
@@ -561,7 +559,7 @@ private case class GpuParquetFileFilterHandler(
val magic = new Array[Byte](MAGIC.length)
inputStream.readFully(magic)
val footerIndex = footerLengthIndex - footerLength
GpuParquetUtils.verifyParquetMagic(filePath, magic)
verifyParquetMagic(filePath, magic)
if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
throw new RuntimeException(s"corrupted file: the footer index is not within " +
s"the file: $footerIndex")
@@ -586,6 +584,21 @@ private case class GpuParquetFileFilterHandler(
}
}


private def verifyParquetMagic(filePath: Path, magic: Array[Byte]): Unit = {
if (!util.Arrays.equals(MAGIC, magic)) {
if (util.Arrays.equals(PARQUET_MAGIC_ENCRYPTED, magic)) {
throw new RuntimeException("The GPU does not support reading encrypted Parquet " +
"files. To read encrypted or columnar encrypted files, disable the GPU Parquet " +
s"reader via ${RapidsConf.ENABLE_PARQUET_READ.key}.")
} else {
throw new RuntimeException(s"$filePath is not a Parquet file. " +
s"Expected magic number at tail ${util.Arrays.toString(MAGIC)} " +
s"but found ${util.Arrays.toString(magic)}")
}
}
}

private def readAndFilterFooter(
file: PartitionedFile,
conf : Configuration,
@@ -647,15 +660,8 @@ private case class GpuParquetFileFilterHandler(
}
}
}.getOrElse {
VeloxHDFS.readParquetFooterBuffer(filePath, conf).map { hmb =>
withResource(hmb) { _ =>
ParquetFileReader.readFooter(new HMBInputFile(hmb),
ParquetMetadataConverter.range(file.start, file.start + file.length))
}
}.getOrElse {
ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
}
ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
}
}
}
@@ -1586,36 +1592,13 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
conf, out.buffer, filePath.toUri,
coalescedRanges.map(r => IntRangeWithOffset(r.offset, r.length, r.outputOffset))
).getOrElse {
// try to read data through VeloxHDFS if necessary
val streamHandle = VeloxHDFS.createInputFileStream(filePathString)
if (streamHandle > 0) {
// Builds ParquetCopyRange while computing total read size
val ranges = ArrayBuffer.empty[FileCopyRange]
val bufferAddr = out.buffer.getAddress
val readSize = coalescedRanges.foldLeft(0L) { (acc, blockCopy) =>
ranges += FileCopyRange(
blockCopy.offset,
blockCopy.length,
bufferAddr + blockCopy.outputOffset
)
out.seek(blockCopy.outputOffset + blockCopy.length)
acc + blockCopy.length
}
// leverage velox::HdfsReadFile to buffer Hdfs files (which based on libhdfs3)
VeloxHDFS.copyRangesFromFile(
filePathString, streamHandle, ranges,
closeAfterFinished = true
)
readSize
} else {
withResource(filePath.getFileSystem(conf).open(filePath)) { in =>
val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize)
coalescedRanges.foldLeft(0L) { (acc, blockCopy) =>
acc + copyDataRange(blockCopy, in, out, copyBuffer)
}
}
}
}
// try to cache the remote ranges that were copied
remoteCopies.foreach { range =>
metrics.getOrElse(GpuMetric.FILECACHE_DATA_RANGE_MISSES, NoopMetric) += 1
@@ -2741,6 +2724,12 @@ case class ParquetTableReader(
}

override def close(): Unit = {
debugDumpPrefix.foreach { prefix =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
if (debugDumpAlways) {
val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet")
logWarning(s"Wrote data for $splitsString to $p")
}
}
reader.close()
buffer.close()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,14 +16,10 @@

package com.nvidia.spark.rapids

import java.nio.charset.StandardCharsets
import java.util
import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileWriter.MAGIC
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData, ColumnPath}
import org.apache.parquet.schema.MessageType

@@ -87,27 +83,4 @@ object GpuParquetUtils extends Logging {

block
}

/**
* Verify the Magic code stored in the Parquet Footer
*
* @param filePath the path of Parquet file
* @param magic the Magic code extracted from the file
*/
def verifyParquetMagic(filePath: Path, magic: Array[Byte]): Unit = {
if (!util.Arrays.equals(MAGIC, magic)) {
if (util.Arrays.equals(PARQUET_MAGIC_ENCRYPTED, magic)) {
throw new RuntimeException("The GPU does not support reading encrypted Parquet " +
"files. To read encrypted or columnar encrypted files, disable the GPU Parquet " +
s"reader via ${RapidsConf.ENABLE_PARQUET_READ.key}.")
} else {
throw new RuntimeException(s"$filePath is not a Parquet file. " +
s"Expected magic number at tail ${util.Arrays.toString(MAGIC)} " +
s"but found ${util.Arrays.toString(magic)}")
}
}
}

private val PARQUET_MAGIC_ENCRYPTED = "PARE".getBytes(StandardCharsets.US_ASCII)

}
Original file line number Diff line number Diff line change
@@ -1696,7 +1696,7 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")

object VeloxFilterPushdownType extends Enumeration {
val ALL_SUPPORTED, NONE, UNCHANGED = Value
}
}

val PUSH_DOWN_FILTERS_TO_VELOX = conf("spark.rapids.sql.parquet.pushDownFiltersToVelox")
.doc("Push down all supported filters to Velox if set to ALL_SUPPORTED. " +
@@ -1722,13 +1722,6 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.integerConf
.createWithDefault(0)

val ENABLE_VELOX_HDFS = conf("spark.rapids.sql.velox.useVeloxHDFS")
.doc("Use HDFS reader of velox to do buffering instead of Hadoop Java API")
.internal()
.startupOnly()
.booleanConf
.createWithDefault(false)

val LOAD_VELOX = conf("spark.rapids.sql.loadVelox")
.doc("Load Velox (through Gluten) as a spark driver plugin")
.startupOnly()
revans2 marked this conversation as resolved.
Show resolved Hide resolved
@@ -2884,8 +2877,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val parquetVeloxPreloadCapacity: Int = get(PARQUET_VELOX_PRELOAD_CAP)

lazy val enableVeloxHDFS: Boolean = get(ENABLE_VELOX_HDFS)

lazy val loadVelox: Boolean = get(LOAD_VELOX)

lazy val hashAggReplaceMode: String = get(HASH_AGG_REPLACE_MODE)