Skip to content

Commit

Permalink
add metrics GpuPartitioning.CopyToHostTime
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <[email protected]>
  • Loading branch information
sperlingxx committed Dec 17, 2024
1 parent 7e465d8 commit 967d345
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
object GpuPartitioning {
// The maximum size of an Array minus a bit for overhead for metadata
val MaxCpuBatchSize = 2147483639L - 2048L

// The SQLMetric key for MemoryCopyFromDeviceToHost
val CopyToHostTime: String = "d2hMemCpyTime"
}

trait GpuPartitioning extends Partitioning {
Expand Down Expand Up @@ -132,7 +135,15 @@ trait GpuPartitioning extends Partitioning {
}
}
withResource(hostPartColumns) { _ =>
Cuda.DEFAULT_STREAM.sync()
lazy val memCpyNvtxRange = memCopyTime.map(
new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, _))
.getOrElse(
new NvtxRange("PartitionD2H", NvtxColor.CYAN))
// Wait for copyToHostAsync
withResource(memCpyNvtxRange) { _ =>
Cuda.DEFAULT_STREAM.sync()
}

// Leaving the GPU for a while
GpuSemaphore.releaseIfNecessary(TaskContext.get())

Expand Down Expand Up @@ -241,4 +252,19 @@ trait GpuPartitioning extends Partitioning {
}
}
}

private var memCopyTime: Option[GpuMetric] = None

/**
* Setup Spark SQL Metrics for the details of GpuPartition. This method is expected to be called
* at the query planning stage for only once.
*/
def setupMetrics(metrics: Map[String, GpuMetric]): Unit = {
metrics.get(GpuPartitioning.CopyToHostTime).foreach { metric =>
// Check and set GpuPartitioning.CopyToHostTime
require(memCopyTime.isEmpty,
s"The GpuMetric[${GpuPartitioning.CopyToHostTime}] has already been set")
memCopyTime = Some(metric)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ object GpuShuffleExchangeExecBase {
val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time"
val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime"
val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time"
val METRIC_COPY_TO_HOST_TIME = GpuPartitioning.CopyToHostTime
val METRIC_DESC_COPY_TO_HOST_TIME = "RAPIDS shuffle DeviceToHost copy time"

def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map(
// dataSize and dataReadSize are uncompressed, one is on write and the other on read
Expand Down Expand Up @@ -322,7 +324,9 @@ object GpuShuffleExchangeExecBase {
METRIC_SHUFFLE_SER_COPY_HEADER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME),
METRIC_SHUFFLE_SER_COPY_BUFFER_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME)
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME),
METRIC_COPY_TO_HOST_TIME ->
gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_COPY_TO_HOST_TIME)
)

def prepareBatchShuffleDependency(
Expand Down Expand Up @@ -364,6 +368,12 @@ object GpuShuffleExchangeExecBase {
rdd
}
val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning)
// Inject detailed Metrics, such as D2HTime before SliceOnCpu
// The injected metrics will be serialized as the members of GpuPartitioning
partitioner match {
case pt: GpuPartitioning => pt.setupMetrics(additionalMetrics)
case _ =>
}
val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME)
def getPartitioned: ColumnarBatch => Any = {
batch => partitionTime.ns {
Expand Down

0 comments on commit 967d345

Please sign in to comment.