diff --git a/python/README.md b/python/README.md index 31718ab0..aa9945cc 100644 --- a/python/README.md +++ b/python/README.md @@ -10,7 +10,7 @@ First, install RAPIDS cuML per [these instructions](https://rapids.ai/start.html ```bash conda create -n rapids-24.10 \ -c rapidsai -c conda-forge -c nvidia \ - cuml=24.10 cuvs=24.10 python=3.10 cuda-version=11.8 + cuml=24.10 cuvs=24.10 python=3.10 cuda-version=11.8 numpy~=1.0 ``` **Note**: while testing, we recommend using conda or docker to simplify installation and isolate your environment while experimenting. Once you have a working environment, you can then try installing directly, if necessary. diff --git a/python/src/spark_rapids_ml/core.py b/python/src/spark_rapids_ml/core.py index 644c88a7..91569c36 100644 --- a/python/src/spark_rapids_ml/core.py +++ b/python/src/spark_rapids_ml/core.py @@ -75,6 +75,7 @@ from .params import _CumlParams from .utils import ( _ArrayOrder, + _configure_memory_resource, _get_gpu_id, _get_spark_session, _is_local, @@ -707,15 +708,8 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame: # set gpu device _CumlCommon._set_gpu_device(context, is_local) - if cuda_managed_mem_enabled: - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator - - rmm.reinitialize( - managed_memory=True, - devices=_CumlCommon._get_gpu_device(context, is_local), - ) - cp.cuda.set_allocator(rmm_cupy_allocator) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) _CumlCommon._initialize_cuml_logging(cuml_verbose) @@ -1381,18 +1375,8 @@ def _transform_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame: _CumlCommon._set_gpu_device(context, is_local, True) - if cuda_managed_mem_enabled: - import cupy as cp - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator - - rmm.reinitialize( - managed_memory=True, - devices=_CumlCommon._get_gpu_device( - context, is_local, is_transform=True - ), - ) - cp.cuda.set_allocator(rmm_cupy_allocator) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) # Construct the cuml counterpart object cuml_instance = construct_cuml_object_func() @@ -1560,12 +1544,21 @@ def _transform(self, dataset: DataFrame) -> DataFrame: output_schema = self._out_schema(dataset.schema) + cuda_managed_mem_enabled = ( + _get_spark_session().conf.get("spark.rapids.ml.uvm.enabled", "false") + == "true" + ) + if cuda_managed_mem_enabled: + get_logger(self.__class__).info("CUDA managed memory enabled.") + @pandas_udf(output_schema) # type: ignore def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]: from pyspark import TaskContext context = TaskContext.get() _CumlCommon._set_gpu_device(context, is_local, True) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) cuml_objects = construct_cuml_object_func() cuml_object = ( cuml_objects[0] if isinstance(cuml_objects, list) else cuml_objects diff --git a/python/src/spark_rapids_ml/umap.py b/python/src/spark_rapids_ml/umap.py index 2fc68498..e735aa12 100644 --- a/python/src/spark_rapids_ml/umap.py +++ b/python/src/spark_rapids_ml/umap.py @@ -87,6 +87,7 @@ from .utils import ( _ArrayOrder, _concat_and_free, + _configure_memory_resource, _get_spark_session, _is_local, dtype_to_pyspark_type, @@ -1110,13 +1111,6 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: import cupy as cp import cupyx - if cuda_managed_mem_enabled: - import rmm - from rmm.allocators.cupy import rmm_cupy_allocator - - rmm.reinitialize(managed_memory=True) - cp.cuda.set_allocator(rmm_cupy_allocator) - _CumlCommon._initialize_cuml_logging(cuml_verbose) context = TaskContext.get() @@ -1124,6 +1118,9 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]: # set gpu device _CumlCommon._set_gpu_device(context, is_local) + # must do after setting gpu device + _configure_memory_resource(cuda_managed_mem_enabled) + # handle the input # inputs = [(X, Optional(y)), (X, Optional(y))] logger.info("Loading data into python worker memory") diff --git a/python/src/spark_rapids_ml/utils.py b/python/src/spark_rapids_ml/utils.py index 0edcf7fa..b6ef0f2b 100644 --- a/python/src/spark_rapids_ml/utils.py +++ b/python/src/spark_rapids_ml/utils.py @@ -144,6 +144,21 @@ def _get_gpu_id(task_context: TaskContext) -> int: return gpu_id +def _configure_memory_resource(uvm_enabled: bool = False) -> None: + import cupy as cp + import rmm + from rmm.allocators.cupy import rmm_cupy_allocator + + if uvm_enabled: + if not type(rmm.mr.get_current_device_resource()) == type( + rmm.mr.ManagedMemoryResource() + ): + rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource()) + + if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__: + cp.cuda.set_allocator(rmm_cupy_allocator) + + def _get_default_params_from_func( func: Callable, unsupported_set: List[str] = [] ) -> Dict[str, Any]: