Skip to content

Commit

Permalink
Avoid concatentating multiple host buffers when reading Parquet (#11911)
Browse files Browse the repository at this point in the history
Depends upon rapidsai/cudf#17673.

Updates the multithreaded Parquet reader to leverage the new multiple
host buffers reader interface. This removes the need to concatenate
multiple host buffers into a single buffer before decoding the data via
the GPU. This also makes it easier to accept "late arrivals" in the
multithreaded combine reader after waking up with the GPU semaphore,
since we only need to fabricate a new footer to accommodate the
additional buffers in the read.

---------

Signed-off-by: Jason Lowe <[email protected]>-
  • Loading branch information
jlowe authored Jan 14, 2025
1 parent c3029d8 commit 03b85b1
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 152 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,6 +65,28 @@ object DumpUtils extends Logging {
}
}

def dumpBuffer(
conf: Configuration,
data: Array[HostMemoryBuffer],
prefix: String,
suffix: String): String = {
try {
val (out, path) = FileUtils.createTempFile(conf, prefix, suffix)
withResource(out) { _ =>
data.foreach { hmb =>
withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in =>
IOUtils.copy(in, out)
}
}
}
path.toString
} catch {
case e: Exception =>
log.error(s"Error attempting to dump data", e)
s"<error writing data $e>"
}
}

/**
* Debug utility to dump columnar batch to parquet file. <br>
* It's running on GPU. Parquet column names are generated from columnar batch type info. <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.language.implicitConversions
import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric.{BUFFER_TIME, FILTER_TIME}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, AutoCloseableProducingSeq}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -53,13 +53,13 @@ import org.apache.spark.util.SerializableConfiguration
* This contains a single HostMemoryBuffer along with other metadata needed
* for combining the buffers before sending to GPU.
*/
case class SingleHMBAndMeta(hmb: HostMemoryBuffer, bytes: Long, numRows: Long,
case class SingleHMBAndMeta(hmbs: Array[HostMemoryBuffer], bytes: Long, numRows: Long,
blockMeta: Seq[DataBlockBase])

object SingleHMBAndMeta {
// Contains no data but could have number of rows for things like count().
def empty(numRows: Long = 0): SingleHMBAndMeta = {
SingleHMBAndMeta(null.asInstanceOf[HostMemoryBuffer], 0, numRows, Seq.empty)
SingleHMBAndMeta(Array.empty, 0, numRows, Seq.empty)
}
}

Expand Down Expand Up @@ -675,9 +675,7 @@ abstract class MultiFileCloudPartitionReaderBase(
private def closeCurrentFileHostBuffers(): Unit = {
currentFileHostBuffers.foreach { current =>
current.memBuffersAndSizes.foreach { hbInfo =>
if (hbInfo.hmb != null) {
hbInfo.hmb.close()
}
hbInfo.hmbs.safeClose()
}
}
currentFileHostBuffers = None
Expand All @@ -692,9 +690,7 @@ abstract class MultiFileCloudPartitionReaderBase(
tasks.asScala.foreach { task =>
if (task.isDone()) {
task.get.memBuffersAndSizes.foreach { hmbInfo =>
if (hmbInfo.hmb != null) {
hmbInfo.hmb.close()
}
hmbInfo.hmbs.safeClose()
}
} else {
// Note we are not interrupting thread here so it
Expand Down Expand Up @@ -1009,9 +1005,9 @@ abstract class MultiFileCoalescingPartitionReaderBase(
if (currentChunkMeta.currentChunk.isEmpty) {
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk,
val dataBuffer = readPartFiles(currentChunkMeta.currentChunk,
currentChunkMeta.clippedSchema)
if (dataSize == 0) {
if (dataBuffer.getLength == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
Expand All @@ -1020,8 +1016,8 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// We don't want to actually close the host buffer until we know that we don't
// want to retry more, so offset the close for now.
dataBuffer.incRefCount()
val tableReader = readBufferToTablesAndClose(dataBuffer,
dataSize, currentChunkMeta.clippedSchema, currentChunkMeta.readSchema,
val tableReader = readBufferToTablesAndClose(dataBuffer, dataBuffer.getLength,
currentChunkMeta.clippedSchema, currentChunkMeta.readSchema,
currentChunkMeta.extraInfo)
CachedGpuBatchIterator(tableReader, colTypes)
}
Expand All @@ -1042,12 +1038,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* Read all data blocks into HostMemoryBuffer
* @param blocks a sequence of data blocks to be read
* @param clippedSchema the clipped schema is used to calculate the estimated output size
* @return (HostMemoryBuffer, Long)
* the HostMemoryBuffer and its data size
* @return the HostMemoryBuffer
*/
private def readPartFiles(
blocks: Seq[(Path, DataBlockBase)],
clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = {
clippedSchema: SchemaBase): HostMemoryBuffer = {

withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))) { _ =>
Expand Down Expand Up @@ -1138,7 +1133,9 @@ abstract class MultiFileCoalescingPartitionReaderBase(
}
logDebug(s"$getFileFormatShortName Coalescing reading estimates the initTotalSize:" +
s" $initTotalSize, and the true size: $finalBufferSize")
(finalBuffer, finalBufferSize)
withResource(finalBuffer) { _ =>
finalBuffer.slice(0, finalBufferSize)
}
}
}

Expand Down
25 changes: 13 additions & 12 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2105,12 +2105,12 @@ class MultiFileCloudOrcPartitionReader(
val (hostBuf, bufSize) = readPartFile(ctx, blocksToRead)
val numRows = blocksToRead.map(_.infoBuilder.getNumberOfRows).sum
val metas = blocksToRead.map(b => OrcDataStripe(OrcStripeWithMeta(b, ctx)))
hostBuffers += SingleHMBAndMeta(hostBuf, bufSize, numRows, metas)
hostBuffers += SingleHMBAndMeta(Array(hostBuf), bufSize, numRows, metas)
}
val bytesRead = fileSystemBytesRead() - startingBytesRead
if (isDone) {
// got close before finishing
hostBuffers.foreach(_.hmb.safeClose())
hostBuffers.flatMap(_.hmbs).safeClose()
logDebug("Reader is closed, return empty buffer for the current read for " +
s"file: ${partFile.filePath.toString}")
HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema)
Expand All @@ -2123,7 +2123,7 @@ class MultiFileCloudOrcPartitionReader(
}
} catch {
case e: Throwable =>
hostBuffers.foreach(_.hmb.safeClose())
hostBuffers.flatMap(_.hmbs).safeClose(e)
throw e
}
val bufferTime = System.nanoTime() - bufferTimeStart
Expand Down Expand Up @@ -2222,9 +2222,10 @@ class MultiFileCloudOrcPartitionReader(
case buffer: HostMemoryBuffersWithMetaData =>
val memBuffersAndSize = buffer.memBuffersAndSizes
val hmbInfo = memBuffersAndSize.head
val batchIter = readBufferToBatches(hmbInfo.hmb, hmbInfo.bytes, buffer.updatedReadSchema,
buffer.requestedMapping, filterHandler.isCaseSensitive, buffer.partitionedFile,
buffer.allPartValues)
require(hmbInfo.hmbs.length == 1)
val batchIter = readBufferToBatches(hmbInfo.hmbs.head, hmbInfo.bytes,
buffer.updatedReadSchema, buffer.requestedMapping, filterHandler.isCaseSensitive,
buffer.partitionedFile, buffer.allPartValues)
if (memBuffersAndSize.length > 1) {
val updatedBuffers = memBuffersAndSize.drop(1)
currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers))
Expand Down Expand Up @@ -2314,9 +2315,10 @@ class MultiFileCloudOrcPartitionReader(
toCombine.foreach { hmbWithMeta =>
hmbWithMeta.memBuffersAndSizes.foreach { buf =>
val dataCopyAmount = buf.blockMeta.map(_.getBlockSize).sum
if (dataCopyAmount > 0 && buf.hmb != null) {
if (dataCopyAmount > 0 && buf.hmbs.nonEmpty) {
require(buf.hmbs.length == 1)
combinedBuf.copyFromHostBuffer(
offset, buf.hmb, OrcTools.ORC_MAGIC.length, dataCopyAmount)
offset, buf.hmbs.head, OrcTools.ORC_MAGIC.length, dataCopyAmount)
}
// update the offset for each stripe
var stripeOffset = offset
Expand All @@ -2325,9 +2327,7 @@ class MultiFileCloudOrcPartitionReader(
stripeOffset += block.getBlockSize
}
offset += dataCopyAmount
if (buf.hmb != null) {
buf.hmb.close()
}
buf.hmbs.safeClose()
allOutputStripes ++= buf.blockMeta.map(_.stripe)
}
}
Expand Down Expand Up @@ -2364,7 +2364,8 @@ class MultiFileCloudOrcPartitionReader(

// e: Create the new meta for the combined buffer
val numRows = combinedMeta.allPartValues.map(_._1).sum
val combinedRet = SingleHMBAndMeta(maybeNewBuf, outStream.getPos, numRows, blockMetas)
val combinedRet = SingleHMBAndMeta(Array(maybeNewBuf), outStream.getPos, numRows,
blockMetas)
val newHmbWithMeta = metaToUse.copy(
memBuffersAndSizes = Array(combinedRet),
allPartValues = Some(combinedMeta.allPartValues))
Expand Down
Loading

0 comments on commit 03b85b1

Please sign in to comment.