Skip to content

Commit

Permalink
host watermark metric (#11725)
Browse files Browse the repository at this point in the history
* host watermark metric

Signed-off-by: Zach Puller <[email protected]>

* make disk and host trackers global

Signed-off-by: Zach Puller <[email protected]>

---------

Signed-off-by: Zach Puller <[email protected]>
  • Loading branch information
zpuller authored Nov 22, 2024
1 parent e5547a1 commit cacc3ae
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 8 deletions.
20 changes: 20 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostAlloc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package com.nvidia.spark.rapids
import ai.rapids.cudf.{DefaultHostMemoryAllocator, HostMemoryAllocator, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.jni.{CpuRetryOOM, RmmSpark}

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.GpuTaskMetrics

private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with Logging {
private var currentNonPinnedAllocated: Long = 0L
Expand Down Expand Up @@ -52,17 +54,32 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
}
}

private def getHostAllocMetricsLogStr(metrics: GpuTaskMetrics): String = {
Option(TaskContext.get()).map { context =>
val taskId = context.taskAttemptId()
val totalSize = metrics.getHostBytesAllocated
val maxSize = metrics.getMaxHostBytesAllocated
s"total size for task $taskId is $totalSize, max size is $maxSize"
}.getOrElse("allocated memory outside of a task context")
}

private def releasePinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

private def releaseNonPinned(ptr: Long, amount: Long): Unit = {
synchronized {
currentNonPinnedAllocated -= amount
}
val metrics = GpuTaskMetrics.get
metrics.decHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.cpuDeallocate(ptr, amount)
}

Expand Down Expand Up @@ -186,6 +203,9 @@ private class HostAlloc(nonPinnedLimit: Long) extends HostMemoryAllocator with L
allocAttemptFinishedWithoutException = true
} finally {
if (ret.isDefined) {
val metrics = GpuTaskMetrics.get
metrics.incHostBytesAllocated(amount)
logTrace(getHostAllocMetricsLogStr(metrics))
RmmSpark.postCpuAllocSuccess(ret.get.getAddress, amount, blocking, isRecursive)
} else {
// shouldRetry should indicate if spill did anything for us and we should try again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,25 +121,38 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator
private val maxHostMemoryBytes = new HighWatermarkAccumulator
private val maxDiskMemoryBytes = new HighWatermarkAccumulator

private var diskBytesAllocated: Long = 0
private var maxHostBytesAllocated: Long = 0

private var maxDiskBytesAllocated: Long = 0

def getDiskBytesAllocated: Long = diskBytesAllocated
def getDiskBytesAllocated: Long = GpuTaskMetrics.diskBytesAllocated

def getMaxDiskBytesAllocated: Long = maxDiskBytesAllocated

def getHostBytesAllocated: Long = GpuTaskMetrics.hostBytesAllocated

def getMaxHostBytesAllocated: Long = maxHostBytesAllocated

def incHostBytesAllocated(bytes: Long): Unit = {
GpuTaskMetrics.incHostBytesAllocated(bytes)
maxHostBytesAllocated = maxHostBytesAllocated.max(GpuTaskMetrics.hostBytesAllocated)
}

def decHostBytesAllocated(bytes: Long): Unit = {
GpuTaskMetrics.decHostBytesAllocated(bytes)
}


def incDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated += bytes
maxDiskBytesAllocated = maxDiskBytesAllocated.max(diskBytesAllocated)
GpuTaskMetrics.incDiskBytesAllocated(bytes)
maxDiskBytesAllocated = maxDiskBytesAllocated.max(GpuTaskMetrics.diskBytesAllocated)
}

def decDiskBytesAllocated(bytes: Long): Unit = {
diskBytesAllocated -= bytes
// For some reason it's possible for the task to start out by releasing resources,
// possibly from a previous task, in such case we probably should just ignore it.
diskBytesAllocated = diskBytesAllocated.max(0)
GpuTaskMetrics.decHostBytesAllocated(bytes)
}

private val metrics = Map[String, AccumulatorV2[_, _]](
Expand All @@ -153,6 +166,7 @@ class GpuTaskMetrics extends Serializable {
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes,
"gpuMaxHostMemoryBytes" -> maxHostMemoryBytes,
"gpuMaxDiskMemoryBytes" -> maxDiskMemoryBytes
)

Expand Down Expand Up @@ -242,6 +256,9 @@ class GpuTaskMetrics extends Serializable {
// add method instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
if (maxHostBytesAllocated > 0) {
maxHostMemoryBytes.add(maxHostBytesAllocated)
}
if (maxDiskBytesAllocated > 0) {
maxDiskMemoryBytes.add(maxDiskBytesAllocated)
}
Expand All @@ -254,6 +271,25 @@ class GpuTaskMetrics extends Serializable {
object GpuTaskMetrics extends Logging {
private val taskLevelMetrics = mutable.Map[Long, GpuTaskMetrics]()

private var hostBytesAllocated: Long = 0
private var diskBytesAllocated: Long = 0

private def incHostBytesAllocated(bytes: Long): Unit = synchronized {
hostBytesAllocated += bytes
}

private def decHostBytesAllocated(bytes: Long): Unit = synchronized {
hostBytesAllocated -= bytes
}

def incDiskBytesAllocated(bytes: Long): Unit = synchronized {
diskBytesAllocated += bytes
}

def decDiskBytesAllocated(bytes: Long): Unit = synchronized {
diskBytesAllocated -= bytes
}

def registerOnTask(metrics: GpuTaskMetrics): Unit = synchronized {
val tc = TaskContext.get()
if (tc != null) {
Expand Down

0 comments on commit cacc3ae

Please sign in to comment.