Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid concatentating multiple host buffers when reading Parquet #11911

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,6 +65,28 @@ object DumpUtils extends Logging {
}
}

def dumpBuffer(
conf: Configuration,
data: Array[HostMemoryBuffer],
prefix: String,
suffix: String): String = {
try {
val (out, path) = FileUtils.createTempFile(conf, prefix, suffix)
withResource(out) { _ =>
data.foreach { hmb =>
withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in =>
IOUtils.copy(in, out)
}
}
}
path.toString
} catch {
case e: Exception =>
log.error(s"Error attempting to dump data", e)
s"<error writing data $e>"
}
}

/**
* Debug utility to dump columnar batch to parquet file. <br>
* It's running on GPU. Parquet column names are generated from columnar batch type info. <br>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ import scala.language.implicitConversions
import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Table}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.GpuMetric.{BUFFER_TIME, FILTER_TIME}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, AutoCloseableProducingSeq}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
Expand All @@ -53,13 +53,13 @@ import org.apache.spark.util.SerializableConfiguration
* This contains a single HostMemoryBuffer along with other metadata needed
* for combining the buffers before sending to GPU.
*/
case class SingleHMBAndMeta(hmb: HostMemoryBuffer, bytes: Long, numRows: Long,
case class SingleHMBAndMeta(hmbs: Array[HostMemoryBuffer], bytes: Long, numRows: Long,
blockMeta: Seq[DataBlockBase])

object SingleHMBAndMeta {
// Contains no data but could have number of rows for things like count().
def empty(numRows: Long = 0): SingleHMBAndMeta = {
SingleHMBAndMeta(null.asInstanceOf[HostMemoryBuffer], 0, numRows, Seq.empty)
SingleHMBAndMeta(Array.empty, 0, numRows, Seq.empty)
}
}

Expand Down Expand Up @@ -715,9 +715,7 @@ abstract class MultiFileCloudPartitionReaderBase(
private def closeCurrentFileHostBuffers(): Unit = {
currentFileHostBuffers.foreach { current =>
current.memBuffersAndSizes.foreach { hbInfo =>
if (hbInfo.hmb != null) {
hbInfo.hmb.close()
}
hbInfo.hmbs.safeClose()
}
}
currentFileHostBuffers = None
Expand All @@ -732,9 +730,7 @@ abstract class MultiFileCloudPartitionReaderBase(
tasks.asScala.foreach { task =>
if (task.isDone()) {
task.get.memBuffersAndSizes.foreach { hmbInfo =>
if (hmbInfo.hmb != null) {
hmbInfo.hmb.close()
}
hmbInfo.hmbs.safeClose()
}
} else {
// Note we are not interrupting thread here so it
Expand Down Expand Up @@ -1049,9 +1045,9 @@ abstract class MultiFileCoalescingPartitionReaderBase(
if (currentChunkMeta.currentChunk.isEmpty) {
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk,
val dataBuffer = readPartFiles(currentChunkMeta.currentChunk,
currentChunkMeta.clippedSchema)
if (dataSize == 0) {
if (dataBuffer.getLength == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
Expand All @@ -1060,8 +1056,8 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// We don't want to actually close the host buffer until we know that we don't
// want to retry more, so offset the close for now.
dataBuffer.incRefCount()
val tableReader = readBufferToTablesAndClose(dataBuffer,
dataSize, currentChunkMeta.clippedSchema, currentChunkMeta.readSchema,
val tableReader = readBufferToTablesAndClose(dataBuffer, dataBuffer.getLength,
currentChunkMeta.clippedSchema, currentChunkMeta.readSchema,
currentChunkMeta.extraInfo)
CachedGpuBatchIterator(tableReader, colTypes)
}
Expand All @@ -1082,12 +1078,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* Read all data blocks into HostMemoryBuffer
* @param blocks a sequence of data blocks to be read
* @param clippedSchema the clipped schema is used to calculate the estimated output size
* @return (HostMemoryBuffer, Long)
* the HostMemoryBuffer and its data size
* @return the HostMemoryBuffer
*/
private def readPartFiles(
blocks: Seq[(Path, DataBlockBase)],
clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = {
clippedSchema: SchemaBase): HostMemoryBuffer = {

withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))) { _ =>
Expand Down Expand Up @@ -1178,7 +1173,9 @@ abstract class MultiFileCoalescingPartitionReaderBase(
}
logDebug(s"$getFileFormatShortName Coalescing reading estimates the initTotalSize:" +
s" $initTotalSize, and the true size: $finalBufferSize")
(finalBuffer, finalBufferSize)
withResource(finalBuffer) { _ =>
finalBuffer.slice(0, finalBufferSize)
}
}
}

Expand Down
27 changes: 14 additions & 13 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
* Copyright (c) 2019-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2105,12 +2105,12 @@ class MultiFileCloudOrcPartitionReader(
val (hostBuf, bufSize) = readPartFile(ctx, blocksToRead)
val numRows = blocksToRead.map(_.infoBuilder.getNumberOfRows).sum
val metas = blocksToRead.map(b => OrcDataStripe(OrcStripeWithMeta(b, ctx)))
hostBuffers += SingleHMBAndMeta(hostBuf, bufSize, numRows, metas)
hostBuffers += SingleHMBAndMeta(Array(hostBuf), bufSize, numRows, metas)
}
val bytesRead = fileSystemBytesRead() - startingBytesRead
if (isDone) {
// got close before finishing
hostBuffers.foreach(_.hmb.safeClose())
hostBuffers.foreach(_.hmbs.safeClose())
logDebug("Reader is closed, return empty buffer for the current read for " +
s"file: ${partFile.filePath.toString}")
HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema)
Expand All @@ -2123,7 +2123,7 @@ class MultiFileCloudOrcPartitionReader(
}
} catch {
case e: Throwable =>
hostBuffers.foreach(_.hmb.safeClose())
hostBuffers.foreach(_.hmbs.foreach(_.safeClose()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not technically part of this PR, but couldn't safeClose throw and then we don't see e? We should probably also use the Seq variant of safeClose(e)

throw e
}
val bufferTime = System.nanoTime() - bufferTimeStart
Expand Down Expand Up @@ -2223,9 +2223,10 @@ class MultiFileCloudOrcPartitionReader(
case buffer: HostMemoryBuffersWithMetaData =>
val memBuffersAndSize = buffer.memBuffersAndSizes
val hmbInfo = memBuffersAndSize.head
val batchIter = readBufferToBatches(hmbInfo.hmb, hmbInfo.bytes, buffer.updatedReadSchema,
buffer.requestedMapping, filterHandler.isCaseSensitive, buffer.partitionedFile,
buffer.allPartValues)
require(hmbInfo.hmbs.length == 1)
val batchIter = readBufferToBatches(hmbInfo.hmbs.head, hmbInfo.bytes,
buffer.updatedReadSchema, buffer.requestedMapping, filterHandler.isCaseSensitive,
buffer.partitionedFile, buffer.allPartValues)
if (memBuffersAndSize.length > 1) {
val updatedBuffers = memBuffersAndSize.drop(1)
currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers))
Expand Down Expand Up @@ -2315,9 +2316,10 @@ class MultiFileCloudOrcPartitionReader(
toCombine.foreach { hmbWithMeta =>
hmbWithMeta.memBuffersAndSizes.foreach { buf =>
val dataCopyAmount = buf.blockMeta.map(_.getBlockSize).sum
if (dataCopyAmount > 0 && buf.hmb != null) {
if (dataCopyAmount > 0 && buf.hmbs.nonEmpty) {
require(buf.hmbs.length == 1)
combinedBuf.copyFromHostBuffer(
offset, buf.hmb, OrcTools.ORC_MAGIC.length, dataCopyAmount)
offset, buf.hmbs.head, OrcTools.ORC_MAGIC.length, dataCopyAmount)
}
// update the offset for each stripe
var stripeOffset = offset
Expand All @@ -2326,9 +2328,7 @@ class MultiFileCloudOrcPartitionReader(
stripeOffset += block.getBlockSize
}
offset += dataCopyAmount
if (buf.hmb != null) {
buf.hmb.close()
}
buf.hmbs.safeClose()
allOutputStripes ++= buf.blockMeta.map(_.stripe)
}
}
Expand Down Expand Up @@ -2365,7 +2365,8 @@ class MultiFileCloudOrcPartitionReader(

// e: Create the new meta for the combined buffer
val numRows = combinedMeta.allPartValues.map(_._1).sum
val combinedRet = SingleHMBAndMeta(maybeNewBuf, outStream.getPos, numRows, blockMetas)
val combinedRet = SingleHMBAndMeta(Array(maybeNewBuf), outStream.getPos, numRows,
blockMetas)
val newHmbWithMeta = metaToUse.copy(
memBuffersAndSizes = Array(combinedRet),
allPartValues = Some(combinedMeta.allPartValues))
Expand Down
Loading
Loading