Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform handle spill IO outside of locked section in SpillFramework #11880

Open
wants to merge 22 commits into
base: branch-25.02
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.StandardOpenOption
import java.util
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable

Expand Down Expand Up @@ -177,14 +178,44 @@ trait SpillableHandle extends StoreHandle {
*/
def spill(): Long

private val spilling = new AtomicBoolean(false)

/**
* Method used to atomically check and set the spilling state, so that anyone who wants to
* actually perform a spill can ensure they are the only one spilling, without having to block
* on the actual spill operation (IO). Only someone who has set spilling to true to perform their
* spill may set it back to false when they are done. (Visible for tests)
*
* This is a separate check from spillable, which actually checks the state of the buffer handle
*
* @param s whether the caller is trying to spill or not (ie finished)
* @return whether the caller is allowed to spill (or true if s is false)
*/
def setSpilling(s: Boolean): Boolean = {
zpuller marked this conversation as resolved.
Show resolved Hide resolved
if (!s) {
if (!spilling.getAndSet(false)) {
throw new IllegalStateException("tried to setSpilling to false while not spilling!")
zpuller marked this conversation as resolved.
Show resolved Hide resolved
}
true
} else {
!spilling.getAndSet(true)
}
}

/**
* Method used to determine whether a handle tracks an object that could be spilled
* @note At the level of `SpillableHandle`, the only requirement of spillability
* is that the size of the handle is > 0. `approxSizeInBytes` is known at
* construction, and is immutable.
* @return true if currently spillable, false otherwise
*/
private[spill] def spillable: Boolean = approxSizeInBytes > 0
private[spill] def spillable: Boolean = {
if (approxSizeInBytes > 0) {
!spilling.get()
} else {
false
}
}
}

/**
Expand All @@ -195,7 +226,7 @@ trait SpillableHandle extends StoreHandle {
trait DeviceSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
private[spill] var dev: Option[T]

private[spill] override def spillable: Boolean = synchronized {
private[spill] override def spillable: Boolean = {
super.spillable && dev.isDefined
zpuller marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -228,7 +259,7 @@ trait DeviceSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
trait HostSpillableHandle[T <: AutoCloseable] extends SpillableHandle {
private[spill] var host: Option[T]

private[spill] override def spillable: Boolean = synchronized {
private[spill] override def spillable: Boolean = {
super.spillable && host.isDefined
}

Expand Down Expand Up @@ -284,7 +315,7 @@ class SpillableHostBufferHandle private (

override val approxSizeInBytes: Long = sizeInBytes

private[spill] override def spillable: Boolean = synchronized {
private[spill] override def spillable: Boolean = {
if (super.spillable) {
host.getOrElse {
throw new IllegalStateException(
Expand Down Expand Up @@ -319,7 +350,7 @@ class SpillableHostBufferHandle private (
}

override def spill(): Long = {
if (!spillable) {
if (!spillable || !setSpilling(true)) {
zpuller marked this conversation as resolved.
Show resolved Hide resolved
0L
} else {
val spilled = synchronized {
Expand All @@ -345,6 +376,8 @@ class SpillableHostBufferHandle private (
0L
}
}
// Make sure to only set spilling to false if it was previously set to true
setSpilling(false)
releaseHostResource()
spilled
}
Expand Down Expand Up @@ -414,7 +447,7 @@ class SpillableDeviceBufferHandle private (

override val approxSizeInBytes: Long = sizeInBytes

private[spill] override def spillable: Boolean = synchronized {
private[spill] override def spillable: Boolean = {
if (super.spillable) {
dev.getOrElse {
zpuller marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException(
Expand Down Expand Up @@ -453,18 +486,21 @@ class SpillableDeviceBufferHandle private (
}

override def spill(): Long = {
if (!spillable) {
if (!spillable || !setSpilling(true)) {
0L
} else {
synchronized {
if (host.isEmpty && dev.isDefined) {
val spilled = if (host.isEmpty && dev.isDefined) {
host = Some(SpillableHostBufferHandle.createHostHandleFromDeviceBuff(dev.get))
sizeInBytes
} else {
0L
}
setSpilling(false)
spilled
}
}
// Make sure to only set spilling to false if it was previously set to true
zpuller marked this conversation as resolved.
Show resolved Hide resolved
}

override def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1102,4 +1102,46 @@ class SpillFrameworkSuite
testBufferFileDeletion(canShareDiskPaths = true)
}

test("device handle cannot spill once marked as spilling by another thread") {
val (ct, _) = buildContiguousTable()
val buff = ct.getBuffer
buff.incRefCount()
withResource(SpillableDeviceBufferHandle(buff)) { handle =>
withResource(ct) { _ =>
assert(!handle.spillable)
}
assert(handle.spillable)

// we're just simulating the another thread coming in and spilling here
// so we don't have to worry about a race
assert(handle.setSpilling(true))
// the "other thread is spilling" so we cannot claim the spill lock
assert(!handle.setSpilling(true))
assertResult(0)(SpillFramework.stores.deviceStore.spill(handle.approxSizeInBytes))
assert(handle.setSpilling(false))

// now that nobody else is spilling (but the buffer is still not actually spilled),
// we will succeed
assertResult(512)(SpillFramework.stores.deviceStore.spill(handle.approxSizeInBytes))
}
}

test("host handle cannot spill once marked as spilling by another thread") {
withResource(SpillableHostBufferHandle(HostMemoryBuffer.allocate(512))) { hostHandle =>
assert(hostHandle.spillable)

// we're just simulating the another thread coming in and spilling here
// so we don't have to worry about a race
assert(hostHandle.setSpilling(true))
// the "other thread is spilling" so we cannot claim the spill lock
assert(!hostHandle.setSpilling(true))
assertResult(0)(SpillFramework.stores.hostStore.spill(hostHandle.approxSizeInBytes))
assert(hostHandle.setSpilling(false))

// now that nobody else is spilling (but the buffer is still not actually spilled),
// we will succeed
assertResult(512)(SpillFramework.stores.hostStore.spill(hostHandle.approxSizeInBytes))
}
}

}
Loading