Skip to content

Commit

Permalink
move memory resource setting to function and invoke in predict as well
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Ordentlich <[email protected]>
  • Loading branch information
eordentlich committed Dec 9, 2024
1 parent ff25f8f commit 8fa85d6
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 38 deletions.
39 changes: 14 additions & 25 deletions python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from .params import _CumlParams
from .utils import (
_ArrayOrder,
_configure_memory_resource,
_get_gpu_id,
_get_spark_session,
_is_local,
Expand Down Expand Up @@ -707,18 +708,8 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
# set gpu device
_CumlCommon._set_gpu_device(context, is_local)

if cuda_managed_mem_enabled:
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

# avoid initializing these twice to avoid downstream segfaults and other cuda memory errors
if not type(rmm.mr.get_current_device_resource()) == type(
rmm.mr.ManagedMemoryResource()
):
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())

if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__:
cp.cuda.set_allocator(rmm_cupy_allocator)
# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)

_CumlCommon._initialize_cuml_logging(cuml_verbose)

Expand Down Expand Up @@ -1384,19 +1375,8 @@ def _transform_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:

_CumlCommon._set_gpu_device(context, is_local, True)

if cuda_managed_mem_enabled:
import cupy as cp
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

# avoid initializing these twice to avoid downstream segfaults and other cuda memory errors
if not type(rmm.mr.get_current_device_resource()) == type(
rmm.mr.ManagedMemoryResource()
):
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())

if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__:
cp.cuda.set_allocator(rmm_cupy_allocator)
# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)

# Construct the cuml counterpart object
cuml_instance = construct_cuml_object_func()
Expand Down Expand Up @@ -1564,12 +1544,21 @@ def _transform(self, dataset: DataFrame) -> DataFrame:

output_schema = self._out_schema(dataset.schema)

cuda_managed_mem_enabled = (
_get_spark_session().conf.get("spark.rapids.ml.uvm.enabled", "false")
== "true"
)
if cuda_managed_mem_enabled:
get_logger(self.__class__).info("CUDA managed memory enabled.")

@pandas_udf(output_schema) # type: ignore
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
from pyspark import TaskContext

context = TaskContext.get()
_CumlCommon._set_gpu_device(context, is_local, True)
# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)
cuml_objects = construct_cuml_object_func()
cuml_object = (
cuml_objects[0] if isinstance(cuml_objects, list) else cuml_objects
Expand Down
17 changes: 4 additions & 13 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
from .utils import (
_ArrayOrder,
_concat_and_free,
_configure_memory_resource,
_get_spark_session,
_is_local,
dtype_to_pyspark_type,
Expand Down Expand Up @@ -1110,26 +1111,16 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
import cupy as cp
import cupyx

if cuda_managed_mem_enabled:
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

# avoid initializing these twice to avoid downstream segfaults and other cuda memory errors
if not type(rmm.mr.get_current_device_resource()) == type(
rmm.mr.ManagedMemoryResource()
):
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())

if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__:
cp.cuda.set_allocator(rmm_cupy_allocator)

_CumlCommon._initialize_cuml_logging(cuml_verbose)

context = TaskContext.get()

# set gpu device
_CumlCommon._set_gpu_device(context, is_local)

# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)

# handle the input
# inputs = [(X, Optional(y)), (X, Optional(y))]
logger.info("Loading data into python worker memory")
Expand Down
15 changes: 15 additions & 0 deletions python/src/spark_rapids_ml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ def _get_gpu_id(task_context: TaskContext) -> int:
return gpu_id


def _configure_memory_resource(uvm_enabled: bool = False) -> None:
import cupy as cp
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

if uvm_enabled:
if not type(rmm.mr.get_current_device_resource()) == type(
rmm.mr.ManagedMemoryResource()
):
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())

if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__:
cp.cuda.set_allocator(rmm_cupy_allocator)


def _get_default_params_from_func(
func: Callable, unsupported_set: List[str] = []
) -> Dict[str, Any]:
Expand Down

0 comments on commit 8fa85d6

Please sign in to comment.