Skip to content

Commit

Permalink
cleaning
Browse files Browse the repository at this point in the history
Signed-off-by: Zach Puller <[email protected]>
  • Loading branch information
zpuller committed Jan 13, 2025
1 parent 584b5e4 commit c33ce71
Showing 1 changed file with 25 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,6 @@ class SpillableHostBufferHandle private (
}

private var toSpill: Option[HostMemoryBuffer] = None
override def releaseHostResource(): Unit = {
super.releaseHostResource()
}

override def spill(): Long = {
if (!spillable) {
Expand Down Expand Up @@ -360,7 +357,7 @@ class SpillableHostBufferHandle private (
val staging = Some(diskHandleBuilder.build)
synchronized {
disk = staging
// really what we could to is remove it from the store/tracker here
// really what we could do is remove it from the store/tracker here
// but only close the buffer outside of sync block
// edit: actually we already get that for free by having toSpill track it separately
releaseHostResource()
Expand All @@ -371,13 +368,7 @@ class SpillableHostBufferHandle private (
} else {
0
}
// only release toSpill at the end of the spill op, not when close()ing
// actually did we already close it above due to the withResource
// toSpill.foreach(_.close())
toSpill = None
// is this a leak (before my changes)? what if someone comes in and
// grabs host now, but we release
// releaseHostResource()
spilled
}
}
Expand Down Expand Up @@ -603,6 +594,7 @@ class SpillableColumnarBatchHandle private (
}
}

var closed = false
override def spill(): Long = {
if (!spillable) {
0L
Expand All @@ -621,9 +613,11 @@ class SpillableColumnarBatchHandle private (
meta = Some(chunkedPacker.getPackedMeta)
val staging = Some(SpillableHostBufferHandle.createHostHandleWithPacker(chunkedPacker))
synchronized {
host = staging
// set spilled to dev instead of toSpill so that if dev was already closed during spill
// , we won't re-close
if (!closed) {
host = staging
}
// set spilled to dev instead of toSpill so that if dev was already closed during spill,
// we won't re-close
spilled = dev
dev = None
toSpill = None
Expand Down Expand Up @@ -659,12 +653,11 @@ class SpillableColumnarBatchHandle private (
override def close(): Unit = {
synchronized {
if (toSpill.isEmpty) {
// also, does this break if a kernel is running if no spill?
releaseDeviceResource()
}
// if we are currently spilling, this won't be closed properly
host.foreach(_.close())
host = None
closed = true
}
}
}
Expand Down Expand Up @@ -758,6 +751,7 @@ class SpillableColumnarBatchFromBufferHandle private (

private var toSpill: Option[ColumnarBatch] = None
private var spilled: Option[ColumnarBatch] = None
var closed = false
override def spill(): Long = {
if (!spillable) {
0
Expand All @@ -780,9 +774,11 @@ class SpillableColumnarBatchFromBufferHandle private (
cvFromBuffer.getBuffer))
// probably can pull out some duplicated code
synchronized {
host = staging
// set spilled to dev instead of toSpill so that if dev was already closed during spill
// , we won't re-close
if (!closed) {
host = staging
}
// set spilled to dev instead of toSpill so that if dev was already closed during spill,
// we won't re-close
spilled = dev
dev = None
toSpill = None
Expand All @@ -798,12 +794,11 @@ class SpillableColumnarBatchFromBufferHandle private (
override def close(): Unit = {
synchronized {
if (toSpill.isEmpty) {
// also, does this break if a kernel is running if no spill?
releaseDeviceResource()
}
// if we are currently spilling, this won't be closed properly
host.foreach(_.close())
host = None
closed = true
}
}
}
Expand Down Expand Up @@ -875,6 +870,8 @@ class SpillableCompressedColumnarBatchHandle private (
spilled = None
}
}

var closed = false
override def spill(): Long = {
if (!spillable) {
0L
Expand All @@ -896,9 +893,11 @@ class SpillableCompressedColumnarBatchHandle private (
val staging = Some(SpillableHostBufferHandle.createHostHandleFromDeviceBuff(
cvFromBuffer.getTableBuffer))
synchronized {
host = staging
// set spilled to dev instead of toSpill so that if dev was already closed during spill
// , we won't re-close
if (!closed) {
host = staging
}
// set spilled to dev instead of toSpill so that if dev was already closed during spill,
// we won't re-close
spilled = dev
dev = None
toSpill = None
Expand All @@ -914,12 +913,11 @@ class SpillableCompressedColumnarBatchHandle private (
override def close(): Unit = {
synchronized {
if (toSpill.isEmpty) {
// also, does this break if a kernel is running if no spill?
releaseDeviceResource()
}
// if we are currently spilling, this won't be closed properly
host.foreach(_.close())
host = None
closed = true
}
}
}
Expand Down Expand Up @@ -1022,6 +1020,8 @@ class SpillableHostColumnarBatchHandle private (
releaseHostResource()
synchronized {
disk.foreach(_.close())
// not properly closed if spilling
// need to check other handle types as well
disk = None
}
}
Expand Down

0 comments on commit c33ce71

Please sign in to comment.