From f9624bedbf9e9e9ac73e08340ab27bffe23ad36b Mon Sep 17 00:00:00 2001 From: "Rishi C." <77904151+rishic3@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:31:18 -0500 Subject: [PATCH] Avoid local dir creation, ensure dense array ordering during UMAP save() (#823) Signed-off-by: Rishi Chandra --- python/src/spark_rapids_ml/umap.py | 43 +++++++++++++++++++----------- python/tests/test_umap.py | 41 ++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 18 deletions(-) diff --git a/python/src/spark_rapids_ml/umap.py b/python/src/spark_rapids_ml/umap.py index 39a77d85..a3539c25 100644 --- a/python/src/spark_rapids_ml/umap.py +++ b/python/src/spark_rapids_ml/umap.py @@ -790,8 +790,9 @@ class UMAP(UMAPClass, _CumlEstimatorSupervised, _UMAPCumlParams): sample_fraction : float (optional, default=1.0) The fraction of the dataset to be used for fitting the model. Since fitting is done on a single node, very large - datasets must be subsampled to fit within the node's memory and execute in a reasonable time. Smaller fractions - will result in faster training, but may result in sub-optimal embeddings. + datasets must be subsampled to fit within the node's memory. Smaller fractions will result in faster training, but + may decrease embedding quality. Note: this is not guaranteed to provide exactly the fraction specified of the total + count of the given DataFrame. featuresCol: str or List[str] The feature column names, spark-rapids-ml supports vector, array and columnar as the input.\n @@ -1463,22 +1464,30 @@ def write_sparse_array(array: scipy.sparse.spmatrix, df_dir: str) -> None: schema=indices_data_schema, ) - indptr_df.write.parquet( - os.path.join(df_dir, "indptr.parquet"), mode="overwrite" - ) - indices_data_df.write.parquet( - os.path.join(df_dir, "indices_data.parquet"), mode="overwrite" - ) + indptr_df.write.parquet(os.path.join(df_dir, "indptr.parquet")) + indices_data_df.write.parquet(os.path.join(df_dir, "indices_data.parquet")) def write_dense_array(array: np.ndarray, df_path: str) -> None: + assert ( + spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") == "true" + ), "spark.sql.execution.arrow.pyspark.enabled must be set to true to persist array attributes" + schema = StructType( [ - StructField(f"_{i}", FloatType(), False) - for i in range(1, array.shape[1] + 1) + StructField("row_id", LongType(), False), + StructField("data", ArrayType(FloatType(), False), False), ] ) - data_df = spark.createDataFrame(pd.DataFrame(array), schema=schema) - data_df.write.parquet(df_path, mode="overwrite") + data_df = spark.createDataFrame( + pd.DataFrame( + { + "row_id": range(array.shape[0]), + "data": list(array), + } + ), + schema=schema, + ) + data_df.write.parquet(df_path) DefaultParamsWriter.saveMetadata( self.instance, @@ -1491,12 +1500,12 @@ def write_dense_array(array: np.ndarray, df_path: str) -> None: }, ) + # get a copy, since we're going to modify the array attributes model_attributes = self.instance._get_model_attributes() assert model_attributes is not None + model_attributes = model_attributes.copy() data_path = os.path.join(path, "data") - if not os.path.exists(data_path): - os.makedirs(data_path) for key in ["embedding_", "raw_data_"]: array = model_attributes[key] @@ -1547,8 +1556,10 @@ def read_sparse_array( return scipy.sparse.csr_matrix((data, indices, indptr), shape=csr_shape) def read_dense_array(df_path: str) -> np.ndarray: - data_df = spark.read.parquet(df_path) - return np.array(data_df.collect(), dtype=np.float32) + data_df = spark.read.parquet(df_path).orderBy("row_id") + pdf = data_df.toPandas() + assert type(pdf) == pd.DataFrame + return np.array(list(pdf.data), dtype=np.float32) metadata = DefaultParamsReader.loadMetadata(path, self.sc) data_path = os.path.join(path, "data") diff --git a/python/tests/test_umap.py b/python/tests/test_umap.py index f764d9a2..b3a85359 100644 --- a/python/tests/test_umap.py +++ b/python/tests/test_umap.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2024, NVIDIA CORPORATION. +# Copyright (c) 2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,7 +18,6 @@ from typing import Any, Dict, List, Optional, Tuple, Union import cupy as cp -import cupyx import numpy as np import pytest import scipy @@ -415,6 +414,9 @@ def test_umap_copy() -> None: def test_umap_model_persistence( sparse_fit: bool, gpu_number: int, tmp_path: str ) -> None: + import os + import re + import pyspark from packaging import version @@ -459,7 +461,42 @@ def test_umap_model_persistence( path = tmp_path + "/umap_tests" model_path = f"{path}/umap_model" umap_model.write().overwrite().save(model_path) + + try: + umap_model.write().save(model_path) + assert False, "Overwriting should not be permitted" + except Exception as e: + assert re.search(r"Output directory .* already exists", str(e)) + + # double check expected files/directories + model_dir_contents = os.listdir(model_path) + data_dir_contents = os.listdir(f"{model_path}/data") + assert set(model_dir_contents) == {"data", "metadata"} + if sparse_fit: + assert set(data_dir_contents) == { + "metadata.json", + "embedding_.parquet", + "raw_data_csr", + } + assert set(os.listdir(f"{model_path}/data/raw_data_csr")) == { + "indptr.parquet", + "indices_data.parquet", + } + else: + assert set(data_dir_contents) == { + "metadata.json", + "embedding_.parquet", + "raw_data_.parquet", + } + + # make sure we can overwrite + umap_model._cuml_params["n_neighbors"] = 10 + umap_model._cuml_params["set_op_mix_ratio"] = 0.4 + umap_model.write().overwrite().save(model_path) + umap_model_loaded = UMAPModel.load(model_path) + assert umap_model_loaded._cuml_params["n_neighbors"] == 10 + assert umap_model_loaded._cuml_params["set_op_mix_ratio"] == 0.4 _assert_umap_model(umap_model_loaded, input_raw_data)