From de960740609e0b05477d5a3cba61089e2ac3777c Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 16 Dec 2024 16:29:36 -0600 Subject: [PATCH 1/5] Avoid concatentating multiple host buffers when reading Parquet Signed-off-by: Jason Lowe --- .../com/nvidia/spark/rapids/DumpUtils.scala | 24 ++- .../spark/rapids/GpuMultiFileReader.scala | 33 ++-- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 27 +-- .../nvidia/spark/rapids/GpuParquetScan.scala | 155 +++++++----------- .../apache/spark/sql/rapids/GpuAvroScan.scala | 15 +- .../rapids/GpuMultiFileReaderSuite.scala | 4 +- 6 files changed, 124 insertions(+), 134 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index 64b08162557..fcff6192d59 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +65,28 @@ object DumpUtils extends Logging { } } + def dumpBuffer( + conf: Configuration, + data: Array[HostMemoryBuffer], + prefix: String, + suffix: String): String = { + try { + val (out, path) = FileUtils.createTempFile(conf, prefix, suffix) + withResource(out) { _ => + data.foreach { hmb => + withResource(new HostMemoryInputStream(hmb, hmb.getLength)) { in => + IOUtils.copy(in, out) + } + } + } + path.toString + } catch { + case e: Exception => + log.error(s"Error attempting to dump data", e) + s"" + } + } + /** * Debug utility to dump columnar batch to parquet file.
* It's running on GPU. Parquet column names are generated from columnar batch type info.
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index ab02d1f0eea..6b5f28196ea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import scala.language.implicitConversions import ai.rapids.cudf.{HostMemoryBuffer, NvtxColor, NvtxRange, Table} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric.{BUFFER_TIME, FILTER_TIME} -import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq +import com.nvidia.spark.rapids.RapidsPluginImplicits.{AutoCloseableArray, AutoCloseableProducingSeq} import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -53,13 +53,13 @@ import org.apache.spark.util.SerializableConfiguration * This contains a single HostMemoryBuffer along with other metadata needed * for combining the buffers before sending to GPU. */ -case class SingleHMBAndMeta(hmb: HostMemoryBuffer, bytes: Long, numRows: Long, +case class SingleHMBAndMeta(hmbs: Array[HostMemoryBuffer], bytes: Long, numRows: Long, blockMeta: Seq[DataBlockBase]) object SingleHMBAndMeta { // Contains no data but could have number of rows for things like count(). def empty(numRows: Long = 0): SingleHMBAndMeta = { - SingleHMBAndMeta(null.asInstanceOf[HostMemoryBuffer], 0, numRows, Seq.empty) + SingleHMBAndMeta(Array.empty, 0, numRows, Seq.empty) } } @@ -715,9 +715,7 @@ abstract class MultiFileCloudPartitionReaderBase( private def closeCurrentFileHostBuffers(): Unit = { currentFileHostBuffers.foreach { current => current.memBuffersAndSizes.foreach { hbInfo => - if (hbInfo.hmb != null) { - hbInfo.hmb.close() - } + hbInfo.hmbs.safeClose() } } currentFileHostBuffers = None @@ -732,9 +730,7 @@ abstract class MultiFileCloudPartitionReaderBase( tasks.asScala.foreach { task => if (task.isDone()) { task.get.memBuffersAndSizes.foreach { hmbInfo => - if (hmbInfo.hmb != null) { - hmbInfo.hmb.close() - } + hmbInfo.hmbs.safeClose() } } else { // Note we are not interrupting thread here so it @@ -1049,9 +1045,9 @@ abstract class MultiFileCoalescingPartitionReaderBase( if (currentChunkMeta.currentChunk.isEmpty) { CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { - val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk, + val dataBuffer = readPartFiles(currentChunkMeta.currentChunk, currentChunkMeta.clippedSchema) - if (dataSize == 0) { + if (dataBuffer.getLength == 0) { dataBuffer.close() CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { @@ -1060,8 +1056,8 @@ abstract class MultiFileCoalescingPartitionReaderBase( // We don't want to actually close the host buffer until we know that we don't // want to retry more, so offset the close for now. dataBuffer.incRefCount() - val tableReader = readBufferToTablesAndClose(dataBuffer, - dataSize, currentChunkMeta.clippedSchema, currentChunkMeta.readSchema, + val tableReader = readBufferToTablesAndClose(dataBuffer, dataBuffer.getLength, + currentChunkMeta.clippedSchema, currentChunkMeta.readSchema, currentChunkMeta.extraInfo) CachedGpuBatchIterator(tableReader, colTypes) } @@ -1082,12 +1078,11 @@ abstract class MultiFileCoalescingPartitionReaderBase( * Read all data blocks into HostMemoryBuffer * @param blocks a sequence of data blocks to be read * @param clippedSchema the clipped schema is used to calculate the estimated output size - * @return (HostMemoryBuffer, Long) - * the HostMemoryBuffer and its data size + * @return the HostMemoryBuffer */ private def readPartFiles( blocks: Seq[(Path, DataBlockBase)], - clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = { + clippedSchema: SchemaBase): HostMemoryBuffer = { withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, metrics("bufferTime"))) { _ => @@ -1178,7 +1173,9 @@ abstract class MultiFileCoalescingPartitionReaderBase( } logDebug(s"$getFileFormatShortName Coalescing reading estimates the initTotalSize:" + s" $initTotalSize, and the true size: $finalBufferSize") - (finalBuffer, finalBufferSize) + withResource(finalBuffer) { _ => + finalBuffer.slice(0, finalBufferSize) + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index ababb052f27..4c418509e13 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2105,12 +2105,12 @@ class MultiFileCloudOrcPartitionReader( val (hostBuf, bufSize) = readPartFile(ctx, blocksToRead) val numRows = blocksToRead.map(_.infoBuilder.getNumberOfRows).sum val metas = blocksToRead.map(b => OrcDataStripe(OrcStripeWithMeta(b, ctx))) - hostBuffers += SingleHMBAndMeta(hostBuf, bufSize, numRows, metas) + hostBuffers += SingleHMBAndMeta(Array(hostBuf), bufSize, numRows, metas) } val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.foreach(_.hmbs.safeClose()) logDebug("Reader is closed, return empty buffer for the current read for " + s"file: ${partFile.filePath.toString}") HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema) @@ -2123,7 +2123,7 @@ class MultiFileCloudOrcPartitionReader( } } catch { case e: Throwable => - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.foreach(_.hmbs.foreach(_.safeClose())) throw e } val bufferTime = System.nanoTime() - bufferTimeStart @@ -2223,9 +2223,10 @@ class MultiFileCloudOrcPartitionReader( case buffer: HostMemoryBuffersWithMetaData => val memBuffersAndSize = buffer.memBuffersAndSizes val hmbInfo = memBuffersAndSize.head - val batchIter = readBufferToBatches(hmbInfo.hmb, hmbInfo.bytes, buffer.updatedReadSchema, - buffer.requestedMapping, filterHandler.isCaseSensitive, buffer.partitionedFile, - buffer.allPartValues) + require(hmbInfo.hmbs.length == 1) + val batchIter = readBufferToBatches(hmbInfo.hmbs.head, hmbInfo.bytes, + buffer.updatedReadSchema, buffer.requestedMapping, filterHandler.isCaseSensitive, + buffer.partitionedFile, buffer.allPartValues) if (memBuffersAndSize.length > 1) { val updatedBuffers = memBuffersAndSize.drop(1) currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers)) @@ -2315,9 +2316,10 @@ class MultiFileCloudOrcPartitionReader( toCombine.foreach { hmbWithMeta => hmbWithMeta.memBuffersAndSizes.foreach { buf => val dataCopyAmount = buf.blockMeta.map(_.getBlockSize).sum - if (dataCopyAmount > 0 && buf.hmb != null) { + if (dataCopyAmount > 0 && buf.hmbs.nonEmpty) { + require(buf.hmbs.length == 1) combinedBuf.copyFromHostBuffer( - offset, buf.hmb, OrcTools.ORC_MAGIC.length, dataCopyAmount) + offset, buf.hmbs.head, OrcTools.ORC_MAGIC.length, dataCopyAmount) } // update the offset for each stripe var stripeOffset = offset @@ -2326,9 +2328,7 @@ class MultiFileCloudOrcPartitionReader( stripeOffset += block.getBlockSize } offset += dataCopyAmount - if (buf.hmb != null) { - buf.hmb.close() - } + buf.hmbs.safeClose() allOutputStripes ++= buf.blockMeta.map(_.stripe) } } @@ -2365,7 +2365,8 @@ class MultiFileCloudOrcPartitionReader( // e: Create the new meta for the combined buffer val numRows = combinedMeta.allPartValues.map(_._1).sum - val combinedRet = SingleHMBAndMeta(maybeNewBuf, outStream.getPos, numRows, blockMetas) + val combinedRet = SingleHMBAndMeta(Array(maybeNewBuf), outStream.getPos, numRows, + blockMetas) val newHmbWithMeta = metaToUse.copy( memBuffersAndSizes = Array(combinedRet), allPartValues = Some(combinedMeta.allPartValues)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 03eb48de6fb..a7677476fff 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * Copyright (c) 2019-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import java.io.{Closeable, EOFException, FileNotFoundException, InputStream, IOException, OutputStream} +import java.io.{Closeable, EOFException, FileNotFoundException, IOException, InputStream, OutputStream} import java.net.URI import java.nio.ByteBuffer import java.nio.channels.SeekableByteChannel @@ -34,13 +34,12 @@ import ai.rapids.cudf._ import com.github.luben.zstd.ZstdDecompressCtx import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.GpuMetric._ -import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy} +import com.nvidia.spark.rapids.ParquetPartitionReader.{CopyRange, LocalCopy, PARQUET_MAGIC} import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.filecache.FileCache import com.nvidia.spark.rapids.jni.{DateTimeRebase, ParquetFooter} import com.nvidia.spark.rapids.shims.{ColumnDefaultValuesShims, GpuParquetCrypto, GpuTypeShims, ParquetLegacyNanoAsLongShims, ParquetSchemaClipShims, ParquetStringPredShims, ShimFilePartitionReaderFactory, SparkShimImpl} -import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, Path} @@ -1930,7 +1929,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics protected def readPartFile( blocks: Seq[BlockMetaData], clippedSchema: MessageType, - filePath: Path): (HostMemoryBuffer, Long, Seq[BlockMetaData]) = { + filePath: Path): (HostMemoryBuffer, Seq[BlockMetaData]) = { withResource(new NvtxRange("Parquet buffer file split", NvtxColor.YELLOW)) { _ => val estTotalSize = calculateParquetOutputSize(blocks, clippedSchema, false) closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb => @@ -1950,7 +1949,10 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " + s"small, actual written: ${out.getPos}") } - (hmb, out.getPos, outputBlocks) + val outputBuffer = withResource(hmb) { _ => + hmb.slice(0, out.getPos) + } + (outputBuffer, outputBlocks) } } } @@ -2267,7 +2269,7 @@ class MultiFileParquetPartitionReader( MakeParquetTableProducer(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, conf, currentTargetBatchSize, parseOpts, - dataBuffer, 0, dataSize, metrics, + Array(dataBuffer), metrics, extraInfo.dateRebaseMode, extraInfo.timestampRebaseMode, extraInfo.hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, splits, debugDumpPrefix, debugDumpAlways) @@ -2412,80 +2414,54 @@ class MultiFileCloudParquetPartitionReader( logDebug(s"Using Combine mode and actually combining, num files ${toCombineHmbs.size} " + s"files: ${toCombineHmbs.map(_.partitionedFile.filePath).mkString(",")}") val startCombineTime = System.currentTimeMillis() - // this size includes the written header and footer on each buffer so remove - // the size of those to get data size - val existingSizeUsed = toCombineHmbs.map { hbWithMeta => - hbWithMeta.memBuffersAndSizes.map(smb => Math.max(smb.bytes - PARQUET_META_SIZE, 0)).sum - }.sum // since we know not all of them are empty and we know all these have the same schema since // we already separated, just use the clippedSchema from metadata val schemaToUse = metaToUse.clippedSchema - val blocksAlreadyRead = toCombineHmbs.flatMap(_.memBuffersAndSizes.flatMap(_.blockMeta)) - val footerSize = calculateParquetFooterSize(blocksAlreadyRead.toSeq, schemaToUse) - // all will have same schema so same number of columns - val numCols = blocksAlreadyRead.head.getColumns().size() - val extraMemory = calculateExtraMemoryForParquetFooter(numCols, blocksAlreadyRead.size) - val initTotalSize = existingSizeUsed + footerSize + extraMemory - val combined = closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { combinedHmb => - var offset = withResource(new HostMemoryOutputStream(combinedHmb)) { out => - out.write(ParquetPartitionReader.PARQUET_MAGIC) - out.getPos - } + val combined = closeOnExcept(new ArrayBuffer[HostMemoryBuffer]) { buffers => + var offset: Long = PARQUET_MAGIC.size val allOutputBlocks = new ArrayBuffer[BlockMetaData]() - // copy the actual data + // zero-copy the data toCombineHmbs.map { hbWithMeta => hbWithMeta.memBuffersAndSizes.map { hmbInfo => - val copyAmount = hmbInfo.blockMeta.map { meta => + val columnDataSize = hmbInfo.blockMeta.map { meta => meta.getColumns.asScala.map(_.getTotalSize).sum }.sum - if (copyAmount > 0 && hmbInfo.hmb != null) { - combinedHmb.copyFromHostBuffer(offset, hmbInfo.hmb, - ParquetPartitionReader.PARQUET_MAGIC.size, copyAmount) + if (columnDataSize > 0 && hmbInfo.hmbs.nonEmpty) { + val bytesToSlice = if (buffers.isEmpty) { + columnDataSize + PARQUET_MAGIC.size + } else { + columnDataSize + } + val sliceOffset = if (buffers.isEmpty) 0 else PARQUET_MAGIC.size + require(hmbInfo.hmbs.length == 1) + buffers += hmbInfo.hmbs.head.slice(sliceOffset, bytesToSlice) } val outputBlocks = computeBlockMetaData(hmbInfo.blockMeta, offset) allOutputBlocks ++= outputBlocks - offset += copyAmount - if (hmbInfo.hmb != null) { - hmbInfo.hmb.close() - } + offset += columnDataSize + hmbInfo.hmbs.foreach(_.close()) + } + } + if (buffers.isEmpty) { + closeOnExcept(HostMemoryBuffer.allocate(PARQUET_MAGIC.size)) { hmb => + hmb.setBytes(0, PARQUET_MAGIC, 0, PARQUET_MAGIC.size); + buffers += hmb } } // using all of the actual combined output blocks meta calculate what the footer size // will really be val actualFooterSize = calculateParquetFooterSize(allOutputBlocks.toSeq, schemaToUse) - var buf: HostMemoryBuffer = combinedHmb - val totalBufferSize = if ((initTotalSize - offset) < actualFooterSize) { - val newBufferSize = offset + actualFooterSize + 4 + 4 - logWarning(s"The original estimated size $initTotalSize is too small, " + - s"reallocating and copying data to bigger buffer size: $newBufferSize") - // Copy the old buffer to a new allocated bigger buffer and close the old buffer - buf = withResource(combinedHmb) { _ => - withResource(new HostMemoryInputStream(combinedHmb, offset)) { in => - // realloc memory and copy - closeOnExcept(HostMemoryBuffer.allocate(newBufferSize)) { newhmb => - withResource(new HostMemoryOutputStream(newhmb)) { out => - IOUtils.copy(in, out) - } - newhmb - } - } - } - newBufferSize - } else { - initTotalSize - } - - withResource(buf.slice(offset, (totalBufferSize - offset))) { footerHmbSlice => - withResource(new HostMemoryOutputStream(footerHmbSlice)) { footerOut => - writeFooter(footerOut, allOutputBlocks.toSeq, schemaToUse) - BytesUtils.writeIntLittleEndian(footerOut, footerOut.getPos.toInt) - footerOut.write(ParquetPartitionReader.PARQUET_MAGIC) - offset += footerOut.getPos - } + val footerBuf = HostMemoryBuffer.allocate(actualFooterSize + 8) + buffers += footerBuf + withResource(new HostMemoryOutputStream(footerBuf)) { footerOut => + writeFooter(footerOut, allOutputBlocks.toSeq, schemaToUse) + BytesUtils.writeIntLittleEndian(footerOut, footerOut.getPos.toInt) + footerOut.write(ParquetPartitionReader.PARQUET_MAGIC) + offset += footerOut.getPos } - val newHmbBufferInfo = SingleHMBAndMeta(buf, offset, + val newHmbBufferInfo = SingleHMBAndMeta(buffers.toArray, offset, combinedMeta.allPartValues.map(_._1).sum, Seq.empty) val newHmbMeta = HostMemoryBuffersWithMetaData( metaToUse.partitionedFile, @@ -2712,16 +2688,16 @@ class MultiFileCloudParquetPartitionReader( while (blockChunkIter.hasNext) { val blocksToRead = populateCurrentBlockChunk(blockChunkIter, maxReadBatchSizeRows, maxReadBatchSizeBytes, fileBlockMeta.readSchema) - val (dataBuffer, dataSize, blockMeta) = + val (dataBuffer, blockMeta) = readPartFile(blocksToRead, fileBlockMeta.schema, filePath) val numRows = blocksToRead.map(_.getRowCount).sum.toInt - hostBuffers += SingleHMBAndMeta(dataBuffer, dataSize, + hostBuffers += SingleHMBAndMeta(Array(dataBuffer), dataBuffer.getLength, numRows, blockMeta) } val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.foreach(_.hmbs.safeClose()) HostMemoryEmptyMetaData(file, origPartitionedFile, 0, bytesRead, fileBlockMeta.dateRebaseMode, fileBlockMeta.timestampRebaseMode, fileBlockMeta.hasInt96Timestamps, fileBlockMeta.schema, @@ -2737,7 +2713,7 @@ class MultiFileCloudParquetPartitionReader( } } catch { case e: Throwable => - hostBuffers.foreach(_.hmb.safeClose()) + hostBuffers.foreach(_.hmbs.safeClose()) throw e } val bufferTime = System.nanoTime() - bufferStartTime @@ -2810,8 +2786,7 @@ class MultiFileCloudParquetPartitionReader( val hmbAndInfo = memBuffersAndSize.head val batchIter = readBufferToBatches(buffer.dateRebaseMode, buffer.timestampRebaseMode, buffer.hasInt96Timestamps, buffer.clippedSchema, - buffer.readSchema, buffer.partitionedFile, hmbAndInfo.hmb, hmbAndInfo.bytes, - buffer.allPartValues) + buffer.readSchema, buffer.partitionedFile, hmbAndInfo.hmbs, buffer.allPartValues) if (memBuffersAndSize.length > 1) { val updatedBuffers = memBuffersAndSize.drop(1) currentFileHostBuffers = Some(buffer.copy(memBuffersAndSizes = updatedBuffers)) @@ -2829,11 +2804,10 @@ class MultiFileCloudParquetPartitionReader( clippedSchema: MessageType, readDataSchema: StructType, partedFile: PartitionedFile, - hostBuffer: HostMemoryBuffer, - dataSize: Long, + hostBuffers: Array[HostMemoryBuffer], allPartValues: Option[Array[(Long, InternalRow)]]): Iterator[ColumnarBatch] = { - val parseOpts = closeOnExcept(hostBuffer) { _ => + val parseOpts = closeOnExcept(hostBuffers) { _ => getParquetOptions(readDataSchema, clippedSchema, useFieldId) } val colTypes = readDataSchema.fields.map(f => f.dataType) @@ -2841,15 +2815,15 @@ class MultiFileCloudParquetPartitionReader( // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get()) - RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ => +// RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ => // The MakeParquetTableProducer will close the input buffer, and that would be bad // because we don't want to close it until we know that we are done with it - hostBuffer.incRefCount() +// hostBuffer.incRefCount() val tableReader = MakeParquetTableProducer(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts, - hostBuffer, 0, dataSize, metrics, + hostBuffers, metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, debugDumpPrefix, debugDumpAlways) @@ -2872,7 +2846,7 @@ class MultiFileCloudParquetPartitionReader( } } } - } +// } } object MakeParquetTableProducer extends Logging { @@ -2882,9 +2856,7 @@ object MakeParquetTableProducer extends Logging { conf: Configuration, chunkSizeByteLimit: Long, opts: ParquetOptions, - buffer: HostMemoryBuffer, - offset: Long, - len: Long, + buffers: Array[HostMemoryBuffer], metrics : Map[String, GpuMetric], dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, @@ -2899,30 +2871,29 @@ object MakeParquetTableProducer extends Logging { ): GpuDataProducer[Table] = { debugDumpPrefix.foreach { prefix => if (debugDumpAlways) { - val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet") + val p = DumpUtils.dumpBuffer(conf, buffers, prefix, ".parquet") logWarning(s"Wrote data for ${splits.mkString(", ")} to $p") } } if (useChunkedReader) { ParquetTableReader(conf, chunkSizeByteLimit, maxChunkedReaderMemoryUsageSizeBytes, - opts, buffer, offset, - len, metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, + opts, buffers, metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, clippedParquetSchema, splits, debugDumpPrefix, debugDumpAlways) } else { - val table = withResource(buffer) { _ => + val table = withResource(buffers) { _ => try { RmmRapidsRetryIterator.withRetryNoSplit[Table] { withResource(new NvtxWithMetrics("Parquet decode", NvtxColor.DARK_GREEN, metrics(GPU_DECODE_TIME))) { _ => - Table.readParquet(opts, buffer, offset, len) + Table.readParquet(opts, buffers:_*) } } } catch { case e: Exception => val dumpMsg = debugDumpPrefix.map { prefix => if (!debugDumpAlways) { - val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet") + val p = DumpUtils.dumpBuffer(conf, buffers, prefix, ".parquet") s", data dumped to $p" } else { "" @@ -2954,9 +2925,7 @@ case class ParquetTableReader( chunkSizeByteLimit: Long, maxChunkedReaderMemoryUsageSizeBytes: Long, opts: ParquetOptions, - buffer: HostMemoryBuffer, - offset: Long, - len: Long, + buffers: Array[HostMemoryBuffer], metrics : Map[String, GpuMetric], dateRebaseMode: DateTimeRebaseMode, timestampRebaseMode: DateTimeRebaseMode, @@ -2969,7 +2938,7 @@ case class ParquetTableReader( debugDumpPrefix: Option[String], debugDumpAlways: Boolean) extends GpuDataProducer[Table] with Logging { private[this] val reader = new ParquetChunkedReader(chunkSizeByteLimit, - maxChunkedReaderMemoryUsageSizeBytes, opts, buffer, offset, len) + maxChunkedReaderMemoryUsageSizeBytes, opts, buffers:_*) private[this] lazy val splitsString = splits.mkString("; ") @@ -2984,7 +2953,7 @@ case class ParquetTableReader( case e: Exception => val dumpMsg = debugDumpPrefix.map { prefix => if (!debugDumpAlways) { - val p = DumpUtils.dumpBuffer(conf, buffer, offset, len, prefix, ".parquet") + val p = DumpUtils.dumpBuffer(conf, buffers, prefix, ".parquet") s", data dumped to $p" } else { "" @@ -3009,7 +2978,7 @@ case class ParquetTableReader( override def close(): Unit = { reader.close() - buffer.close() + buffers.safeClose() } } @@ -3100,10 +3069,10 @@ class ParquetPartitionReader( CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { val parseOpts = getParquetOptions(readDataSchema, clippedParquetSchema, useFieldId) - val (dataBuffer, dataSize, _) = metrics(BUFFER_TIME).ns { + val (dataBuffer, _) = metrics(BUFFER_TIME).ns { readPartFile(currentChunkedBlocks, clippedParquetSchema, filePath) } - if (dataSize == 0) { + if (dataBuffer.getLength == 0) { dataBuffer.close() CachedGpuBatchIterator(EmptyTableReader, colTypes) } else { @@ -3117,7 +3086,7 @@ class ParquetPartitionReader( val producer = MakeParquetTableProducer(useChunkedReader, maxChunkedReaderMemoryUsageSizeBytes, conf, targetBatchSizeBytes, parseOpts, - dataBuffer, 0, dataSize, metrics, + Array(dataBuffer), metrics, dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, isSchemaCaseSensitive, useFieldId, readDataSchema, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index 02153b5cb87..b9ee03ae05a 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. + * Copyright (c) 2022-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -663,7 +663,7 @@ class GpuMultiFileCloudAvroPartitionReader( val bufsAndSizes = buffer.memBuffersAndSizes val bufAndSizeInfo = bufsAndSizes.head val partitionValues = buffer.partitionedFile.partitionValues - val batchIter = if (bufAndSizeInfo.hmb == null) { + val batchIter = if (bufAndSizeInfo.hmbs.nonEmpty) { // Not reading any data, but add in partition data if needed // Someone is going to process this data, even if it is just a row count GpuSemaphore.acquireIfNecessary(TaskContext.get()) @@ -671,7 +671,8 @@ class GpuMultiFileCloudAvroPartitionReader( BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(emptyBatch, partitionValues, partitionSchema, maxGpuColumnSizeBytes) } else { - val maybeBatch = sendToGpu(bufAndSizeInfo.hmb, bufAndSizeInfo.bytes, files) + require(bufAndSizeInfo.hmbs.length == 1) + val maybeBatch = sendToGpu(bufAndSizeInfo.hmbs.head, bufAndSizeInfo.bytes, files) // we have to add partition values here for this batch, we already verified that // it's not different for all the blocks in this batch maybeBatch match { @@ -845,18 +846,18 @@ class GpuMultiFileCloudAvroPartitionReader( // One batch is done optOut.foreach(out => hostBuffers += - (SingleHMBAndMeta(optHmb.get, out.getPos, batchRowsNum, Seq.empty))) + (SingleHMBAndMeta(Array(optHmb.get), out.getPos, batchRowsNum, Seq.empty))) totalRowsNum += batchRowsNum estBlocksSize -= batchSize } } // end of while val bufAndSize: Array[SingleHMBAndMeta] = if (readDataSchema.isEmpty) { - hostBuffers.foreach(_.hmb.safeClose(new Exception)) + hostBuffers.foreach(_.hmbs.safeClose()) Array(SingleHMBAndMeta.empty(totalRowsNum)) } else if (isDone) { // got close before finishing, return null buffer and zero size - hostBuffers.foreach(_.hmb.safeClose(new Exception)) + hostBuffers.foreach(_.hmbs.safeClose()) Array(SingleHMBAndMeta.empty()) } else { hostBuffers.toArray @@ -864,7 +865,7 @@ class GpuMultiFileCloudAvroPartitionReader( createBufferAndMeta(bufAndSize, startingBytesRead) } catch { case e: Throwable => - hostBuffers.foreach(_.hmb.safeClose(e)) + hostBuffers.foreach(_.hmbs.safeClose(e)) throw e } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala index 21f364e085b..9dc0035675e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuMultiFileReaderSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,7 @@ class GpuMultiFileReaderSuite extends AnyFunSuite { val conf = new Configuration(false) val membuffers = Array(SingleHMBAndMeta( - HostMemoryBuffer.allocate(0), 0L, 0, Seq.empty)) + Array(HostMemoryBuffer.allocate(0)), 0L, 0, Seq.empty)) val multiFileReader = new MultiFileCloudPartitionReaderBase( conf, inputFiles = Array.empty, From 1150c4ff9409f6a5471f8157c57a76e89bd4cbc7 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Fri, 3 Jan 2025 15:23:16 -0600 Subject: [PATCH 2/5] Restore withRetry block --- .../nvidia/spark/rapids/GpuParquetScan.scala | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index a7677476fff..4fa0d209190 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -2815,38 +2815,40 @@ class MultiFileCloudParquetPartitionReader( // about to start using the GPU GpuSemaphore.acquireIfNecessary(TaskContext.get()) -// RmmRapidsRetryIterator.withRetryNoSplit(hostBuffer) { _ => - // The MakeParquetTableProducer will close the input buffer, and that would be bad - // because we don't want to close it until we know that we are done with it -// hostBuffer.incRefCount() - val tableReader = MakeParquetTableProducer(useChunkedReader, - maxChunkedReaderMemoryUsageSizeBytes, - conf, targetBatchSizeBytes, - parseOpts, - hostBuffers, metrics, - dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, - isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, - debugDumpPrefix, debugDumpAlways) - - val batchIter = CachedGpuBatchIterator(tableReader, colTypes) - - if (allPartValues.isDefined) { - val allPartInternalRows = allPartValues.get.map(_._2) - val rowsPerPartition = allPartValues.get.map(_._1) - new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows, - rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes) - } else { - // this is a bit weird, we don't have number of rows when allPartValues isn't - // filled in so can't use GpuColumnarBatchWithPartitionValuesIterator - batchIter.flatMap { batch => - // we have to add partition values here for this batch, we already verified that - // its not different for all the blocks in this batch - BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch, - partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes) + withResource(hostBuffers) { _ => + RmmRapidsRetryIterator.withRetryNoSplit { + // The MakeParquetTableProducer will close the input buffers, and that would be bad + // because we don't want to close them until we know that we are done with them. + hostBuffers.foreach(_.incRefCount()) + val tableReader = MakeParquetTableProducer(useChunkedReader, + maxChunkedReaderMemoryUsageSizeBytes, + conf, targetBatchSizeBytes, + parseOpts, + hostBuffers, metrics, + dateRebaseMode, timestampRebaseMode, hasInt96Timestamps, + isSchemaCaseSensitive, useFieldId, readDataSchema, clippedSchema, files, + debugDumpPrefix, debugDumpAlways) + + val batchIter = CachedGpuBatchIterator(tableReader, colTypes) + + if (allPartValues.isDefined) { + val allPartInternalRows = allPartValues.get.map(_._2) + val rowsPerPartition = allPartValues.get.map(_._1) + new GpuColumnarBatchWithPartitionValuesIterator(batchIter, allPartInternalRows, + rowsPerPartition, partitionSchema, maxGpuColumnSizeBytes) + } else { + // this is a bit weird, we don't have number of rows when allPartValues isn't + // filled in so can't use GpuColumnarBatchWithPartitionValuesIterator + batchIter.flatMap { batch => + // we have to add partition values here for this batch, we already verified that + // its not different for all the blocks in this batch + BatchWithPartitionDataUtils.addSinglePartitionValueToBatch(batch, + partedFile.partitionValues, partitionSchema, maxGpuColumnSizeBytes) + } } } } -// } + } } object MakeParquetTableProducer extends Logging { From 0571d2a6ad5546654b47ab7699e0ed9b68969a6d Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 6 Jan 2025 14:31:23 -0600 Subject: [PATCH 3/5] Fix safeClose usage --- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 4 ++-- .../main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 6 +++--- .../scala/org/apache/spark/sql/rapids/GpuAvroScan.scala | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 4c418509e13..278dbdceca4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -2110,7 +2110,7 @@ class MultiFileCloudOrcPartitionReader( val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing - hostBuffers.foreach(_.hmbs.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose() logDebug("Reader is closed, return empty buffer for the current read for " + s"file: ${partFile.filePath.toString}") HostMemoryEmptyMetaData(partFile, 0, bytesRead, readDataSchema) @@ -2123,7 +2123,7 @@ class MultiFileCloudOrcPartitionReader( } } catch { case e: Throwable => - hostBuffers.foreach(_.hmbs.foreach(_.safeClose())) + hostBuffers.flatMap(_.hmbs).safeClose(e) throw e } val bufferTime = System.nanoTime() - bufferTimeStart diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 4fa0d209190..150dd8bd5a5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -2441,7 +2441,7 @@ class MultiFileCloudParquetPartitionReader( val outputBlocks = computeBlockMetaData(hmbInfo.blockMeta, offset) allOutputBlocks ++= outputBlocks offset += columnDataSize - hmbInfo.hmbs.foreach(_.close()) + hmbInfo.hmbs.safeClose() } } if (buffers.isEmpty) { @@ -2697,7 +2697,7 @@ class MultiFileCloudParquetPartitionReader( val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing - hostBuffers.foreach(_.hmbs.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose() HostMemoryEmptyMetaData(file, origPartitionedFile, 0, bytesRead, fileBlockMeta.dateRebaseMode, fileBlockMeta.timestampRebaseMode, fileBlockMeta.hasInt96Timestamps, fileBlockMeta.schema, @@ -2713,7 +2713,7 @@ class MultiFileCloudParquetPartitionReader( } } catch { case e: Throwable => - hostBuffers.foreach(_.hmbs.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose(e) throw e } val bufferTime = System.nanoTime() - bufferStartTime diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index b9ee03ae05a..c2a4f453761 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -853,11 +853,11 @@ class GpuMultiFileCloudAvroPartitionReader( } // end of while val bufAndSize: Array[SingleHMBAndMeta] = if (readDataSchema.isEmpty) { - hostBuffers.foreach(_.hmbs.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose() Array(SingleHMBAndMeta.empty(totalRowsNum)) } else if (isDone) { // got close before finishing, return null buffer and zero size - hostBuffers.foreach(_.hmbs.safeClose()) + hostBuffers.flatMap(_.hmbs).safeClose() Array(SingleHMBAndMeta.empty()) } else { hostBuffers.toArray @@ -865,7 +865,7 @@ class GpuMultiFileCloudAvroPartitionReader( createBufferAndMeta(bufAndSize, startingBytesRead) } catch { case e: Throwable => - hostBuffers.foreach(_.hmbs.safeClose(e)) + hostBuffers.flatMap(_.hmbs).safeClose(e) throw e } } From f00326eaa688ae3897510d04fd49e0224260437d Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 8 Jan 2025 07:44:48 -0800 Subject: [PATCH 4/5] Import order/scalasytle issue --- .../src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 26655462f44..277e0c6b934 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids -import java.io.{Closeable, EOFException, FileNotFoundException, IOException, InputStream, OutputStream} +import java.io.{Closeable, EOFException, FileNotFoundException, InputStream, IOException, OutputStream} import java.net.URI import java.nio.ByteBuffer import java.nio.channels.SeekableByteChannel From 675fe5fe0045af76680f1fc687873e27a89270c1 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Tue, 14 Jan 2025 08:26:02 -0800 Subject: [PATCH 5/5] Update GpuAvroScan.scala Fix issue in GpuAvroScan.readBatches --- .../main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala index 8200bb9eef6..d4a9da091db 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuAvroScan.scala @@ -663,7 +663,7 @@ class GpuMultiFileCloudAvroPartitionReader( val bufsAndSizes = buffer.memBuffersAndSizes val bufAndSizeInfo = bufsAndSizes.head val partitionValues = buffer.partitionedFile.partitionValues - val batchIter = if (bufAndSizeInfo.hmbs.nonEmpty) { + val batchIter = if (bufAndSizeInfo.hmbs.isEmpty) { // Not reading any data, but add in partition data if needed // Someone is going to process this data, even if it is just a row count GpuSemaphore.acquireIfNecessary(TaskContext.get())