Skip to content

Commit

Permalink
Support async writing for query output
Browse files Browse the repository at this point in the history
Signed-off-by: Jihoon Son <[email protected]>
  • Loading branch information
jihoonson committed Nov 18, 2024
1 parent 9b06ae3 commit 7e41ff6
Show file tree
Hide file tree
Showing 10 changed files with 841 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,13 @@ import com.nvidia.spark.Retryable
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.RmmRapidsRetryIterator.{splitSpillableInHalfByRows, withRestoreOnRetry, withRetry, withRetryNoSplit}
import com.nvidia.spark.rapids.io.async.{AsyncOutputStream, TrafficController}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.{ColumnarWriteTaskStatsTracker, GpuWriteTaskStatsTracker}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -70,21 +72,34 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType,
rangeName: String,
includeRetry: Boolean) extends HostBufferConsumer {
includeRetry: Boolean,
enableAsyncWrite: Boolean = false,
holdGpuBetweenBatches: Boolean = false) extends HostBufferConsumer with Logging {

protected val tableWriter: TableWriter

protected val conf: Configuration = context.getConfiguration

// This is implemented as a method to make it easier to subclass
// ColumnarOutputWriter in the tests, and override this behavior.
protected def getOutputStream: FSDataOutputStream = {
private val trafficController: TrafficController = TrafficController.getInstance

private def openOutputStream(): OutputStream = {
val hadoopPath = new Path(path)
val fs = hadoopPath.getFileSystem(conf)
fs.create(hadoopPath, false)
}

protected val outputStream: FSDataOutputStream = getOutputStream
// This is implemented as a method to make it easier to subclass
// ColumnarOutputWriter in the tests, and override this behavior.
protected def getOutputStream: OutputStream = {
if (enableAsyncWrite) {
logDebug("Async output write enabled")
new AsyncOutputStream(() => openOutputStream(), trafficController)
} else {
openOutputStream()
}
}

protected val outputStream: OutputStream = getOutputStream

private[this] val tempBuffer = new Array[Byte](128 * 1024)
private[this] var anythingWritten = false
Expand Down Expand Up @@ -166,7 +181,11 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
}
// we successfully buffered to host memory, release the semaphore and write
// the buffered data to the FS
GpuSemaphore.releaseIfNecessary(TaskContext.get)
if (!holdGpuBetweenBatches) {
logDebug("Releasing semaphore between batches")
GpuSemaphore.releaseIfNecessary(TaskContext.get)
}

writeBufferedData()
updateStatistics(writeStartTime, gpuTime, statsTrackers)
spillableBatch.numRows()
Expand Down Expand Up @@ -202,6 +221,10 @@ abstract class ColumnarOutputWriter(context: TaskAttemptContext,
// buffer an empty batch on close() to work around issues in cuDF
// where corrupt files can be written if nothing is encoded via the writer.
anythingWritten = true

// tableWriter.write() serializes the table into the HostMemoryBuffer, and buffers it
// by calling handleBuffer() on the ColumnarOutputWriter. It may not write to the
// output stream just yet.
tableWriter.write(table)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,19 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
s"Set Parquet option ${ParquetOutputFormat.JOB_SUMMARY_LEVEL} to NONE.")
}

val asyncOutputWriteEnabled = RapidsConf.ENABLE_ASYNC_OUTPUT_WRITE.get(sqlConf)
// holdGpuBetweenBatches is on by default if asyncOutputWriteEnabled is on
val holdGpuBetweenBatches = RapidsConf.ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK.get(sqlConf)
.getOrElse(asyncOutputWriteEnabled)

new ColumnarOutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString,
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled)
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled,
asyncOutputWriteEnabled, holdGpuBetweenBatches)
}

override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -299,8 +305,11 @@ class GpuParquetWriter(
dateRebaseMode: DateTimeRebaseMode,
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true) {
parquetFieldIdEnabled: Boolean,
enableAsyncWrite: Boolean,
holdGpuBetweenBatches: Boolean)
extends ColumnarOutputWriter(context, dataSchema, "Parquet", true, enableAsyncWrite,
holdGpuBetweenBatches) {
override def throwIfRebaseNeededInExceptionMode(batch: ColumnarBatch): Unit = {
val cols = GpuColumnVector.extractBases(batch)
cols.foreach { col =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.nvidia.spark.DFUDFPlugin
import com.nvidia.spark.rapids.RapidsConf.AllowMultipleJars
import com.nvidia.spark.rapids.RapidsPluginUtils.buildInfoEvent
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg}
import com.nvidia.spark.rapids.io.async.TrafficController
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.commons.lang3.exception.ExceptionUtils
Expand Down Expand Up @@ -554,6 +555,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraExecutorPlugins.foreach(_.init(pluginContext, extraConf))
GpuSemaphore.initialize()
FileCache.init(pluginContext)
TrafficController.initialize(conf)
} catch {
// Exceptions in executor plugin can cause a single thread to die but the executor process
// sticks around without any useful info until it hearbeat times out. Print what happened
Expand Down Expand Up @@ -656,6 +658,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
extraExecutorPlugins.foreach(_.shutdown())
FileCache.shutdown()
GpuCoreDumpHandler.shutdown()
TrafficController.shutdown()
}

override def onTaskFailed(failureReason: TaskFailedReason): Unit = {
Expand Down
34 changes: 34 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,35 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
.booleanConf
.createWithDefault(false)

val ENABLE_ASYNC_OUTPUT_WRITE =
conf("spark.rapids.sql.asyncWrite.queryOutput.enabled")
.doc("Option to turn on the async query output write. During the final output write, the " +
"task first copies the output to the host memory, and then writes it into the storage. " +
"When this option is enabled, the task will asynchronously write the output in the host " +
"memory to the storage. Only the Parquet format is supported currently.")
.booleanConf
.createWithDefault(false)

val ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK =
conf("spark.rapids.sql.asyncWrite.queryOutput.holdGpuInTask")
.doc("Option to hold GPU semaphore between batch processing during the final output write. " +
"This option could degrade query performance if it is enabled without the async query " +
"output write. It is recommended to consider enabling this option only when " +
s"${ENABLE_ASYNC_OUTPUT_WRITE.key} is set. This option is off by default when the async " +
"query output write is disabled; otherwise, it is on.")
.internal()
.booleanConf
.createOptional

val ASYNC_WRITE_MAX_IN_FLIGHT_HOST_MEMORY_BYTES =
conf("spark.rapids.sql.asyncWrite.maxInFlightHostMemoryBytes")
.doc("Maximum number of host memory bytes per executor that can be in-flight for async " +
"query output write. Tasks may be blocked if the total host memory bytes in-flight " +
"exceeds this value.")
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(2 * 1024 * 1024 * 1024)

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -2650,6 +2679,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isFoldableNonLitAllowed: Boolean = get(FOLDABLE_NON_LIT_ALLOWED)

lazy val asyncWriteMaxInFlightHostMemoryBytes: Long =
get(ASYNC_WRITE_MAX_IN_FLIGHT_HOST_MEMORY_BYTES)

/**
* Convert a string value to the injection configuration OomInjection.
*
Expand Down Expand Up @@ -3233,6 +3265,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val caseWhenFuseEnabled: Boolean = get(CASE_WHEN_FUSE)

lazy val isAsyncOutputWriteEnabled: Boolean = get(ENABLE_ASYNC_OUTPUT_WRITE)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.io.async

import java.io.{IOException, OutputStream}
import java.util.concurrent.{Callable, Executors, TimeUnit}
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}

import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.ThreadFactoryBuilder

/**
* OutputStream that performs writes asynchronously. Writes are scheduled on a background thread
* and executed in the order they were scheduled. This class is not thread-safe and should only be
* used by a single thread.
*/
class AsyncOutputStream(openFn: Callable[OutputStream], trafficController: TrafficController)
extends OutputStream {

private var closed = false

private val executor = new ThrottlingExecutor(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("AsyncOutputStream")
.build()
),
trafficController)

private val openFuture = executor.submit(openFn, 0)
// Let's give it enough time to open the stream. Something bad should have happened if it
// takes more than 5 minutes to open a stream.
private val openTimeoutMin = 5

private def delegate: OutputStream = {
openFuture.get(openTimeoutMin, TimeUnit.MINUTES)
}

class Metrics {
var numBytesScheduled: Long = 0
val numBytesWritten: AtomicLong = new AtomicLong(0)
}

val metrics = new Metrics

/**
* The last error that occurred in the background thread, or None if no error occurred.
* Once it is set, all subsequent writes that are already scheduled will fail and no new
* writes will be accepted.
*/
val lastError: AtomicReference[Option[Throwable]] =
new AtomicReference[Option[Throwable]](None)

@throws[IOException]
private def throwIfError(): Unit = {
lastError.get() match {
case Some(t) if t.isInstanceOf[IOException] => throw t
case Some(t) => throw new IOException(t)
case None =>
}
}

@throws[IOException]
private def ensureOpen(): Unit = {
if (closed) {
throw new IOException("Stream closed")
}
}

override def write(b: Int): Unit = {
throwIfError()
ensureOpen()

val buffer = new Array[Byte](1)
buffer(0) = b.toByte
write(buffer)
}

/**
* Schedules a write of the given bytes to the underlying stream. The write is executed
* asynchronously on a background thread. The method returns immediately, and the write may not
* have completed when the method returns.
*
* If an error has occurred in the background thread and [[lastError]] has been set, this function
* will throw an IOException immediately.
*
* If an error has occurred in the background thread while executing a previous write after the
* current write has been scheduled, the current write will fail with the same error.
*/
@throws[IOException]
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
throwIfError()
ensureOpen()

metrics.numBytesScheduled += len
executor.submit(() => {
throwIfError()
ensureOpen()

try {
delegate.write(b, off, len)
metrics.numBytesWritten.addAndGet(len)
} catch {
case t: Throwable =>
// Update the error state
lastError.set(Some(t))
}
}, len)
}

/**
* Flushes all pending writes to the underlying stream. This method blocks until all pending
* writes have been completed. If an error has occurred in the background thread, this method
* will throw an IOException.
*
* If an error has occurred in the background thread and [[lastError]] has been set, this function
* will throw an IOException immediately.
*
* If an error has occurred in the background thread while executing a previous task after the
* current flush has been scheduled, the current flush will fail with the same error.
*/
@throws[IOException]
override def flush(): Unit = {
throwIfError()
ensureOpen()

val f = executor.submit(() => {
throwIfError()
ensureOpen()

delegate.flush()
}, 0)

f.get()
}

/**
* Closes the underlying stream and releases any resources associated with it. All pending writes
* are flushed before closing the stream. This method blocks until all pending writes have been
* completed.
*
* If an error has occurred while flushing, this function will throw an IOException.
*
* If an error has occurred while executing a previous task before this function is called,
* this function will throw the same error. All resources and the underlying stream are still
* guaranteed to be closed.
*/
@throws[IOException]
override def close(): Unit = {
if (!closed) {
Seq[AutoCloseable](
() => {
// Wait for all pending writes to complete
// This will throw an exception if one of the writes fails
flush()
},
() => {
// Give the executor a chance to shutdown gracefully.
executor.shutdownNow(10, TimeUnit.SECONDS)
},
delegate,
() => closed = true).safeClose()
}
}
}
Loading

0 comments on commit 7e41ff6

Please sign in to comment.