From 8fa85d6e14ddfdf9943d82dd21cac88cb651fea5 Mon Sep 17 00:00:00 2001 From: Erik Ordentlich Date: Mon, 9 Dec 2024 09:25:08 -0800 Subject: [PATCH] move memory resource setting to function and invoke in predict as well Signed-off-by: Erik Ordentlich --- python/src/spark_rapids_ml/core.py | 39 +++++++++++------------------ python/src/spark_rapids_ml/umap.py | 17 +++---------- python/src/spark_rapids_ml/utils.py | 15 +++++++++++ 3 files changed, 33 insertions(+), 38 deletions(-) diff --git a/python/src/spark_rapids_ml/core.py b/python/src/spark_rapids_ml/core.py index 76d877c5..91569c36 100644 --- a/python/src/spark_rapids_ml/core.py +++ b/python/src/spark_rapids_ml/core.py @@ -75,6 +75,7 @@ from .params import _CumlParams from .utils import ( _ArrayOrder, + _configure_memory_resource, _get_gpu_id, _get_spark_session, _is_local, @@ -707,18 +708,8 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame: # set gpu device _CumlCommon._set_gpu_device(context, is_local) - if cuda_managed_mem_enabled: - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator - - # avoid initializing these twice to avoid downstream segfaults and other cuda memory errors - if not type(rmm.mr.get_current_device_resource()) == type( - rmm.mr.ManagedMemoryResource() - ): - rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource()) - - if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__: - cp.cuda.set_allocator(rmm_cupy_allocator) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) _CumlCommon._initialize_cuml_logging(cuml_verbose) @@ -1384,19 +1375,8 @@ def _transform_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame: _CumlCommon._set_gpu_device(context, is_local, True) - if cuda_managed_mem_enabled: - import cupy as cp - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator - - # avoid initializing these twice to avoid downstream segfaults and other cuda memory errors - if not type(rmm.mr.get_current_device_resource()) == type( - rmm.mr.ManagedMemoryResource() - ): - rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource()) - - if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__: - cp.cuda.set_allocator(rmm_cupy_allocator) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) # Construct the cuml counterpart object cuml_instance = construct_cuml_object_func() @@ -1564,12 +1544,21 @@ def _transform(self, dataset: DataFrame) -> DataFrame: output_schema = self._out_schema(dataset.schema) + cuda_managed_mem_enabled = ( + _get_spark_session().conf.get("spark.rapids.ml.uvm.enabled", "false") + == "true" + ) + if cuda_managed_mem_enabled: + get_logger(self.__class__).info("CUDA managed memory enabled.") + @pandas_udf(output_schema) # type: ignore def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]: from pyspark import TaskContext context = TaskContext.get() _CumlCommon._set_gpu_device(context, is_local, True) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) cuml_objects = construct_cuml_object_func() cuml_object = ( cuml_objects[0] if isinstance(cuml_objects, list) else cuml_objects diff --git a/python/src/spark_rapids_ml/umap.py b/python/src/spark_rapids_ml/umap.py index c1a282d9..e735aa12 100644 --- a/python/src/spark_rapids_ml/umap.py +++ b/python/src/spark_rapids_ml/umap.py @@ -87,6 +87,7 @@ from .utils import ( _ArrayOrder, _concat_and_free, + _configure_memory_resource, _get_spark_session, _is_local, dtype_to_pyspark_type, @@ -1110,19 +1111,6 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: import cupy as cp import cupyx - if cuda_managed_mem_enabled: - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator - - # avoid initializing these twice to avoid downstream segfaults and other cuda memory errors - if not type(rmm.mr.get_current_device_resource()) == type( - rmm.mr.ManagedMemoryResource() - ): - rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource()) - - if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__: - cp.cuda.set_allocator(rmm_cupy_allocator) - _CumlCommon._initialize_cuml_logging(cuml_verbose) context = TaskContext.get() @@ -1130,6 +1118,9 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: # set gpu device _CumlCommon._set_gpu_device(context, is_local) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) + # handle the input # inputs = [(X, Optional(y)), (X, Optional(y))] logger.info("Loading data into python worker memory") diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 0edcf7fa..b6ef0f2b 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -144,6 +144,21 @@ def _get_gpu_id(task_context: TaskContext) -> int: return gpu_id +def _configure_memory_resource(uvm_enabled: bool = False) -> None: + import cupy as cp + import rmm + from rmm.allocators.cupy import rmm_cupy_allocator + + if uvm_enabled: + if not type(rmm.mr.get_current_device_resource()) == type( + rmm.mr.ManagedMemoryResource() + ): + rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource()) + + if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__: + cp.cuda.set_allocator(rmm_cupy_allocator) + + def _get_default_params_from_func( func: Callable, unsupported_set: List[str] = [] ) -> Dict[str, Any]: