Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix rmm managed memory resource initialization to resolve some intermittent … #787

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ First, install RAPIDS cuML per [these instructions](https://rapids.ai/start.html
```bash
conda create -n rapids-24.10 \
-c rapidsai -c conda-forge -c nvidia \
cuml=24.10 cuvs=24.10 python=3.10 cuda-version=11.8
cuml=24.10 cuvs=24.10 python=3.10 cuda-version=11.8 numpy~=1.0
```

**Note**: while testing, we recommend using conda or docker to simplify installation and isolate your environment while experimenting. Once you have a working environment, you can then try installing directly, if necessary.
Expand Down
35 changes: 14 additions & 21 deletions python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from .params import _CumlParams
from .utils import (
_ArrayOrder,
_configure_memory_resource,
_get_gpu_id,
_get_spark_session,
_is_local,
Expand Down Expand Up @@ -707,15 +708,8 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
# set gpu device
_CumlCommon._set_gpu_device(context, is_local)

if cuda_managed_mem_enabled:
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

rmm.reinitialize(
managed_memory=True,
devices=_CumlCommon._get_gpu_device(context, is_local),
)
cp.cuda.set_allocator(rmm_cupy_allocator)
# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)

_CumlCommon._initialize_cuml_logging(cuml_verbose)

Expand Down Expand Up @@ -1381,18 +1375,8 @@ def _transform_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:

_CumlCommon._set_gpu_device(context, is_local, True)

if cuda_managed_mem_enabled:
import cupy as cp
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

rmm.reinitialize(
managed_memory=True,
devices=_CumlCommon._get_gpu_device(
context, is_local, is_transform=True
),
)
cp.cuda.set_allocator(rmm_cupy_allocator)
# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)

# Construct the cuml counterpart object
cuml_instance = construct_cuml_object_func()
Expand Down Expand Up @@ -1560,12 +1544,21 @@ def _transform(self, dataset: DataFrame) -> DataFrame:

output_schema = self._out_schema(dataset.schema)

cuda_managed_mem_enabled = (
_get_spark_session().conf.get("spark.rapids.ml.uvm.enabled", "false")
== "true"
)
if cuda_managed_mem_enabled:
get_logger(self.__class__).info("CUDA managed memory enabled.")

@pandas_udf(output_schema) # type: ignore
def predict_udf(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.Series]:
from pyspark import TaskContext

context = TaskContext.get()
_CumlCommon._set_gpu_device(context, is_local, True)
# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)
cuml_objects = construct_cuml_object_func()
cuml_object = (
cuml_objects[0] if isinstance(cuml_objects, list) else cuml_objects
Expand Down
11 changes: 4 additions & 7 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
from .utils import (
_ArrayOrder,
_concat_and_free,
_configure_memory_resource,
_get_spark_session,
_is_local,
dtype_to_pyspark_type,
Expand Down Expand Up @@ -1110,20 +1111,16 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
import cupy as cp
import cupyx

if cuda_managed_mem_enabled:
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

rmm.reinitialize(managed_memory=True)
cp.cuda.set_allocator(rmm_cupy_allocator)

_CumlCommon._initialize_cuml_logging(cuml_verbose)

context = TaskContext.get()

# set gpu device
_CumlCommon._set_gpu_device(context, is_local)

# must do after setting gpu device
_configure_memory_resource(cuda_managed_mem_enabled)

# handle the input
# inputs = [(X, Optional(y)), (X, Optional(y))]
logger.info("Loading data into python worker memory")
Expand Down
15 changes: 15 additions & 0 deletions python/src/spark_rapids_ml/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ def _get_gpu_id(task_context: TaskContext) -> int:
return gpu_id


def _configure_memory_resource(uvm_enabled: bool = False) -> None:
import cupy as cp
import rmm
from rmm.allocators.cupy import rmm_cupy_allocator

if uvm_enabled:
if not type(rmm.mr.get_current_device_resource()) == type(
rmm.mr.ManagedMemoryResource()
):
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

One question: any idea on the difference of this line compared with "rmm.reinitialize(managed_memory=True)"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. rmm.reinitialize also resets memory resources for previously specified devices to default mr's. This I believe was the source of our problems. It destroyed the original memory resources, replacing the device to mr mapping to point to the new defaults. However this is only done on the python side. On the c++ side, the corresponding map still points to the destroyed mrs (which are the same object), leading to dangling pointers. So this would lead to undefined behavior in the C++ rmm invocations, if the devices were changing in the same process, like between fit and transform, with python worker reuse on. The observations and code inspection were consistent with the above.


if not cp.cuda.get_allocator().__name__ == rmm_cupy_allocator.__name__:
cp.cuda.set_allocator(rmm_cupy_allocator)


def _get_default_params_from_func(
func: Callable, unsupported_set: List[str] = []
) -> Dict[str, Any]:
Expand Down
Loading