diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala index 42bcf23f049..db227ef3350 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/spill/SpillFramework.scala @@ -319,9 +319,6 @@ class SpillableHostBufferHandle private ( } private var toSpill: Option[HostMemoryBuffer] = None - override def releaseHostResource(): Unit = { - super.releaseHostResource() - } override def spill(): Long = { if (!spillable) { @@ -360,7 +357,7 @@ class SpillableHostBufferHandle private ( val staging = Some(diskHandleBuilder.build) synchronized { disk = staging - // really what we could to is remove it from the store/tracker here + // really what we could do is remove it from the store/tracker here // but only close the buffer outside of sync block // edit: actually we already get that for free by having toSpill track it separately releaseHostResource() @@ -371,13 +368,7 @@ class SpillableHostBufferHandle private ( } else { 0 } - // only release toSpill at the end of the spill op, not when close()ing - // actually did we already close it above due to the withResource -// toSpill.foreach(_.close()) toSpill = None - // is this a leak (before my changes)? what if someone comes in and - // grabs host now, but we release -// releaseHostResource() spilled } } @@ -603,6 +594,7 @@ class SpillableColumnarBatchHandle private ( } } + var closed = false override def spill(): Long = { if (!spillable) { 0L @@ -621,9 +613,11 @@ class SpillableColumnarBatchHandle private ( meta = Some(chunkedPacker.getPackedMeta) val staging = Some(SpillableHostBufferHandle.createHostHandleWithPacker(chunkedPacker)) synchronized { - host = staging - // set spilled to dev instead of toSpill so that if dev was already closed during spill - // , we won't re-close + if (!closed) { + host = staging + } + // set spilled to dev instead of toSpill so that if dev was already closed during spill, + // we won't re-close spilled = dev dev = None toSpill = None @@ -659,12 +653,11 @@ class SpillableColumnarBatchHandle private ( override def close(): Unit = { synchronized { if (toSpill.isEmpty) { - // also, does this break if a kernel is running if no spill? releaseDeviceResource() } - // if we are currently spilling, this won't be closed properly host.foreach(_.close()) host = None + closed = true } } } @@ -758,6 +751,7 @@ class SpillableColumnarBatchFromBufferHandle private ( private var toSpill: Option[ColumnarBatch] = None private var spilled: Option[ColumnarBatch] = None + var closed = false override def spill(): Long = { if (!spillable) { 0 @@ -780,9 +774,11 @@ class SpillableColumnarBatchFromBufferHandle private ( cvFromBuffer.getBuffer)) // probably can pull out some duplicated code synchronized { - host = staging - // set spilled to dev instead of toSpill so that if dev was already closed during spill - // , we won't re-close + if (!closed) { + host = staging + } + // set spilled to dev instead of toSpill so that if dev was already closed during spill, + // we won't re-close spilled = dev dev = None toSpill = None @@ -798,12 +794,11 @@ class SpillableColumnarBatchFromBufferHandle private ( override def close(): Unit = { synchronized { if (toSpill.isEmpty) { - // also, does this break if a kernel is running if no spill? releaseDeviceResource() } - // if we are currently spilling, this won't be closed properly host.foreach(_.close()) host = None + closed = true } } } @@ -875,6 +870,8 @@ class SpillableCompressedColumnarBatchHandle private ( spilled = None } } + + var closed = false override def spill(): Long = { if (!spillable) { 0L @@ -896,9 +893,11 @@ class SpillableCompressedColumnarBatchHandle private ( val staging = Some(SpillableHostBufferHandle.createHostHandleFromDeviceBuff( cvFromBuffer.getTableBuffer)) synchronized { - host = staging - // set spilled to dev instead of toSpill so that if dev was already closed during spill - // , we won't re-close + if (!closed) { + host = staging + } + // set spilled to dev instead of toSpill so that if dev was already closed during spill, + // we won't re-close spilled = dev dev = None toSpill = None @@ -914,12 +913,11 @@ class SpillableCompressedColumnarBatchHandle private ( override def close(): Unit = { synchronized { if (toSpill.isEmpty) { - // also, does this break if a kernel is running if no spill? releaseDeviceResource() } - // if we are currently spilling, this won't be closed properly host.foreach(_.close()) host = None + closed = true } } } @@ -1022,6 +1020,8 @@ class SpillableHostColumnarBatchHandle private ( releaseHostResource() synchronized { disk.foreach(_.close()) + // not properly closed if spilling + // need to check other handle types as well disk = None } }