Skip to content

Commit

Permalink
Change order of initialization so pinned pool is available for spill …
Browse files Browse the repository at this point in the history
…framework buffers

Signed-off-by: Alessandro Bellina <[email protected]>
  • Loading branch information
abellina committed Dec 18, 2024
1 parent 3f26d33 commit 7b5c05b
Showing 1 changed file with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,29 @@ object GpuDeviceManager extends Logging {

private var memoryEventHandler: DeviceMemoryEventHandler = _

private def initializeRmm(gpuId: Int, rapidsConf: Option[RapidsConf]): Unit = {
if (!Rmm.isInitialized) {
val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf))
private def initializeSpillAndMemoryEvents(conf: RapidsConf): Unit = {
SpillFramework.initialize(conf)

memoryEventHandler = new DeviceMemoryEventHandler(
SpillFramework.stores.deviceStore,
conf.gpuOomDumpDir,
conf.gpuOomMaxRetries)

if (conf.sparkRmmStateEnable) {
val debugLoc = if (conf.sparkRmmDebugLocation.isEmpty) {
null
} else {
conf.sparkRmmDebugLocation
}
RmmSpark.setEventHandler(memoryEventHandler, debugLoc)
} else {
logWarning("SparkRMM retry has been disabled")
Rmm.setEventHandler(memoryEventHandler)
}
}

private def initializeRmmGpuPool(gpuId: Int, conf: RapidsConf): Unit = {
if (!Rmm.isInitialized) {
val poolSize = conf.chunkedPackPoolSize
chunkedPackMemoryResource =
if (poolSize > 0) {
Expand Down Expand Up @@ -391,30 +410,10 @@ object GpuDeviceManager extends Logging {
}
}

SpillFramework.initialize(conf)

memoryEventHandler = new DeviceMemoryEventHandler(
SpillFramework.stores.deviceStore,
conf.gpuOomDumpDir,
conf.gpuOomMaxRetries)

if (conf.sparkRmmStateEnable) {
val debugLoc = if (conf.sparkRmmDebugLocation.isEmpty) {
null
} else {
conf.sparkRmmDebugLocation
}
RmmSpark.setEventHandler(memoryEventHandler, debugLoc)
} else {
logWarning("SparkRMM retry has been disabled")
Rmm.setEventHandler(memoryEventHandler)
}
GpuShuffleEnv.init(conf)
}
}

private def initializeOffHeapLimits(gpuId: Int, rapidsConf: Option[RapidsConf]): Unit = {
val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf))
private def initializePinnedPoolAndOffHeapLimits(gpuId: Int, conf: RapidsConf): Unit = {
val setCuioDefaultResource = conf.pinnedPoolCuioDefault
val (pinnedSize, nonPinnedLimit) = if (conf.offHeapLimitEnabled) {
logWarning("OFF HEAP MEMORY LIMITS IS ENABLED. " +
Expand Down Expand Up @@ -508,8 +507,13 @@ object GpuDeviceManager extends Logging {
"Cannot initialize memory due to previous shutdown failing")
} else if (singletonMemoryInitialized == Uninitialized) {
val gpu = gpuId.getOrElse(findGpuAndAcquire())
initializeRmm(gpu, rapidsConf)
initializeOffHeapLimits(gpu, rapidsConf)
val conf = rapidsConf.getOrElse(new RapidsConf(SparkEnv.get.conf))
initializePinnedPoolAndOffHeapLimits(gpu, conf)
initializeRmmGpuPool(gpu, conf)
// we want to initialize this last because we want to take advantage
// of pinned memory if it is configured
initializeSpillAndMemoryEvents(conf)
GpuShuffleEnv.init(conf)
singletonMemoryInitialized = Initialized
}
}
Expand Down

0 comments on commit 7b5c05b

Please sign in to comment.