diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala index 6394e2974b4..d2338a91384 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuPartitioning.scala @@ -32,6 +32,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch object GpuPartitioning { // The maximum size of an Array minus a bit for overhead for metadata val MaxCpuBatchSize = 2147483639L - 2048L + + // The SQLMetric key for MemoryCopyFromDeviceToHost + val CopyToHostTime: String = "d2hMemCpyTime" } trait GpuPartitioning extends Partitioning { @@ -132,7 +135,15 @@ trait GpuPartitioning extends Partitioning { } } withResource(hostPartColumns) { _ => - Cuda.DEFAULT_STREAM.sync() + lazy val memCpyNvtxRange = memCopyTime.map( + new NvtxWithMetrics("PartitionD2H", NvtxColor.CYAN, _)) + .getOrElse( + new NvtxRange("PartitionD2H", NvtxColor.CYAN)) + // Wait for copyToHostAsync + withResource(memCpyNvtxRange) { _ => + Cuda.DEFAULT_STREAM.sync() + } + // Leaving the GPU for a while GpuSemaphore.releaseIfNecessary(TaskContext.get()) @@ -241,4 +252,19 @@ trait GpuPartitioning extends Partitioning { } } } + + private var memCopyTime: Option[GpuMetric] = None + + /** + * Setup Spark SQL Metrics for the details of GpuPartition. This method is expected to be called + * at the query planning stage for only once. + */ + def setupMetrics(metrics: Map[String, GpuMetric]): Unit = { + metrics.get(GpuPartitioning.CopyToHostTime).foreach { metric => + // Check and set GpuPartitioning.CopyToHostTime + require(memCopyTime.isEmpty, + s"The GpuMetric[${GpuPartitioning.CopyToHostTime}] has already been set") + memCopyTime = Some(metric) + } + } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 332545a99e1..6fb78d85554 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -294,6 +294,8 @@ object GpuShuffleExchangeExecBase { val METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME = "RAPIDS shuffle serialization copy header time" val METRIC_SHUFFLE_SER_COPY_BUFFER_TIME = "rapidsShuffleSerializationCopyBufferTime" val METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME = "RAPIDS shuffle serialization copy buffer time" + val METRIC_COPY_TO_HOST_TIME = GpuPartitioning.CopyToHostTime + val METRIC_DESC_COPY_TO_HOST_TIME = "RAPIDS shuffle DeviceToHost copy time" def createAdditionalExchangeMetrics(gpu: GpuExec): Map[String, GpuMetric] = Map( // dataSize and dataReadSize are uncompressed, one is on write and the other on read @@ -322,7 +324,9 @@ object GpuShuffleExchangeExecBase { METRIC_SHUFFLE_SER_COPY_HEADER_TIME -> gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_HEADER_TIME), METRIC_SHUFFLE_SER_COPY_BUFFER_TIME -> - gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME) + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_SHUFFLE_SER_COPY_BUFFER_TIME), + METRIC_COPY_TO_HOST_TIME -> + gpu.createNanoTimingMetric(DEBUG_LEVEL, METRIC_DESC_COPY_TO_HOST_TIME) ) def prepareBatchShuffleDependency( @@ -364,6 +368,12 @@ object GpuShuffleExchangeExecBase { rdd } val partitioner: GpuExpression = getPartitioner(newRdd, outputAttributes, newPartitioning) + // Inject detailed Metrics, such as D2HTime before SliceOnCpu + // The injected metrics will be serialized as the members of GpuPartitioning + partitioner match { + case pt: GpuPartitioning => pt.setupMetrics(additionalMetrics) + case _ => + } val partitionTime: GpuMetric = metrics(METRIC_SHUFFLE_PARTITION_TIME) def getPartitioned: ColumnarBatch => Any = { batch => partitionTime.ns {