Skip to content

Commit

Permalink
Merge pull request #814 from NVIDIA/branch-24.12
Browse files Browse the repository at this point in the history
[auto-merge] branch-24.12 to branch-25.02 [skip ci] [bot]
  • Loading branch information
nvauto authored Dec 31, 2024
2 parents e1a96a3 + 9c5df37 commit 767b334
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 466 deletions.
549 changes: 152 additions & 397 deletions notebooks/umap.ipynb

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,7 @@ def _get_cuml_transform_func(
_TransformFunc,
Optional[_EvaluateFunc],
]:
raise NotImplementedError(
"DBSCAN does not can not have a separate transform UDF"
)
raise NotImplementedError("DBSCAN does not have a separate transform UDF")

def _transform(self, dataset: DataFrame) -> DataFrame:
logger = get_logger(self.__class__)
Expand Down
128 changes: 83 additions & 45 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import pandas as pd
import pyspark
import scipy
from numpy.typing import ArrayLike
from pandas import DataFrame as PandasDataFrame
from pyspark.ml.param.shared import (
HasFeaturesCol,
Expand All @@ -47,6 +48,7 @@
from pyspark.ml.util import DefaultParamsReader, DefaultParamsWriter, MLReader, MLWriter
from pyspark.sql import Column, DataFrame
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import (
ArrayType,
DoubleType,
Expand Down Expand Up @@ -160,7 +162,6 @@ def __init__(self) -> None:
transform_queue_size=4.0,
a=None,
b=None,
precomputed_knn=None,
random_state=None,
build_algo="auto",
build_kwds=None,
Expand Down Expand Up @@ -335,18 +336,6 @@ def __init__(self) -> None:
typeConverter=TypeConverters.toFloat,
)

precomputed_knn = Param(
Params._dummy(),
"precomputed_knn",
(
f"Either one of a tuple (indices, distances) of arrays of shape (n_samples, n_neighbors), a pairwise distances"
f" dense array of shape (n_samples, n_samples) or a KNN graph sparse array (preferably CSR/COO). This feature"
f" allows the precomputation of the KNN outside of UMAP and also allows the use of a custom distance function."
f" This function should match the metric used to train the UMAP embeedings."
),
typeConverter=TypeConverters.toListListFloat,
)

random_state = Param(
Params._dummy(),
"random_state",
Expand Down Expand Up @@ -586,18 +575,6 @@ def setB(self: P, value: float) -> P:
"""
return self._set_params(b=value)

def getPrecomputedKNN(self: P) -> List[List[float]]:
"""
Gets the value of `precomputed_knn`.
"""
return self.getOrDefault("precomputed_knn")

def setPrecomputedKNN(self: P, value: List[List[float]]) -> P:
"""
Sets the value of `precomputed_knn`.
"""
return self._set_params(precomputed_knn=value)

def getRandomState(self: P) -> int:
"""
Gets the value of `random_state`.
Expand Down Expand Up @@ -841,7 +818,7 @@ class UMAP(UMAPClass, _CumlEstimatorSupervised, _UMAPCumlParams):
>>> X, _ = make_blobs(500, 5, centers=42, cluster_std=0.1, dtype=np.float32, random_state=10)
>>> feature_cols = [f"c{i}" for i in range(X.shape[1])]
>>> schema = [f"{c} {"float"}" for c in feature_cols]
>>> schema = [f"{c} {'float'}" for c in feature_cols]
>>> df = spark.createDataFrame(X.tolist(), ",".join(schema))
>>> df = df.withColumn("features", array(*feature_cols)).drop(*feature_cols)
>>> df.show(10, False)
Expand Down Expand Up @@ -904,7 +881,7 @@ def __init__(
transform_queue_size: Optional[float] = 1.0,
a: Optional[float] = None,
b: Optional[float] = None,
precomputed_knn: Optional[List[List[float]]] = None,
precomputed_knn: Optional[Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]] = None,
random_state: Optional[int] = None,
build_algo: Optional[str] = "auto",
build_kwds: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -948,6 +925,9 @@ def _fit(self, dataset: DataFrame) -> "UMAPModel":
# Force to single partition, single worker
self._num_workers = 1
if data_subset.rdd.getNumPartitions() != 1:
get_logger(self.__class__).info(
"Coalescing input data(sub)set to one partition for fit()."
)
data_subset = data_subset.coalesce(1)

df_output = self._call_cuml_fit_func_dataframe(
Expand Down Expand Up @@ -992,6 +972,8 @@ def _fit(self, dataset: DataFrame) -> "UMAPModel":

model._num_workers = input_num_workers

# We don't need the precomputed knn anymore
self.cuml_params["precomputed_knn"] = None
self._copyValues(model)
self._copy_cuml_params(model) # type: ignore

Expand Down Expand Up @@ -1441,19 +1423,45 @@ def _out_schema(self, input_schema: StructType) -> Union[StructType, str]:
return f"array<{pyspark_type}>"

def write(self) -> MLWriter:
return _CumlModelWriterNumpy(self)
return _CumlModelWriterParquet(self)

@classmethod
def read(cls) -> MLReader:
return _CumlModelReaderNumpy(cls)
return _CumlModelReaderParquet(cls)


class _CumlModelWriterNumpy(_CumlModelWriter):
class _CumlModelWriterParquet(_CumlModelWriter):
"""
Override parent writer to save numpy objects of _CumlModel to the file
Override parent writer to write _CumlModel array attributes to Spark Dataframe as parquet
"""

def saveImpl(self, path: str) -> None:

spark = _get_spark_session()

def write_sparse_array(array: scipy.sparse.spmatrix, df_dir: str) -> None:
indptr_df = spark.createDataFrame(array.indptr, schema=["indptr"])
indices_data_df = spark.createDataFrame(
pd.DataFrame(
{
"indices": array.indices,
"data": array.data,
"row_id": range(len(array.indices)),
}
)
)

indptr_df.write.parquet(
os.path.join(df_dir, "indptr.parquet"), mode="overwrite"
)
indices_data_df.write.parquet(
os.path.join(df_dir, "indices_data.parquet"), mode="overwrite"
)

def write_dense_array(array: np.ndarray, df_path: str) -> None:
data_df = spark.createDataFrame(array)
data_df.write.parquet(df_path, mode="overwrite")

DefaultParamsWriter.saveMetadata(
self.instance,
path,
Expand All @@ -1464,22 +1472,25 @@ def saveImpl(self, path: str) -> None:
"_float32_inputs": self.instance._float32_inputs,
},
)
data_path = os.path.join(path, "data")

model_attributes = self.instance._get_model_attributes()
assert model_attributes is not None

data_path = os.path.join(path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path)
assert model_attributes is not None

for key in ["embedding_", "raw_data_"]:
array = model_attributes[key]
if isinstance(array, scipy.sparse.csr_matrix):
npz_path = os.path.join(data_path, f"{key}csr_.npz")
scipy.sparse.save_npz(npz_path, array)
df_dir = os.path.join(data_path, f"{key}csr")
write_sparse_array(array, df_dir)
model_attributes[key] = df_dir
model_attributes[key + "shape"] = array.shape
else:
npz_path = os.path.join(data_path, f"{key}.npz")
np.savez_compressed(npz_path, array)
model_attributes[key] = npz_path
df_path = os.path.join(data_path, f"{key}.parquet")
write_dense_array(array, df_path)
model_attributes[key] = df_path

metadata_file_path = os.path.join(data_path, "metadata.json")
model_attributes_str = json.dumps(model_attributes)
Expand All @@ -1488,12 +1499,39 @@ def saveImpl(self, path: str) -> None:
)


class _CumlModelReaderNumpy(_CumlModelReader):
class _CumlModelReaderParquet(_CumlModelReader):
"""
Override parent reader to instantiate numpy objects of _CumlModel from file
Override parent reader to instantiate _CumlModel array attributes from Spark DataFrame parquet files
"""

def load(self, path: str) -> "_CumlEstimator":

spark = _get_spark_session()

def read_sparse_array(
df_dir: str, csr_shape: Tuple[int, int]
) -> scipy.sparse.csr_matrix:
indptr_df = spark.read.parquet(
os.path.join(df_dir, "indptr.parquet")
).orderBy("indptr")
indices_data_df = spark.read.parquet(
os.path.join(df_dir, "indices_data.parquet")
).orderBy("row_id")

indptr = np.squeeze(np.array(indptr_df.collect(), dtype=np.int32))
indices = np.squeeze(
np.array(indices_data_df.select("indices").collect(), dtype=np.int32)
)
data = np.squeeze(
np.array(indices_data_df.select("data").collect(), dtype=np.float32)
)

return scipy.sparse.csr_matrix((data, indices, indptr), shape=csr_shape)

def read_dense_array(df_path: str) -> np.ndarray:
data_df = spark.read.parquet(df_path)
return np.array(data_df.collect(), dtype=np.float32)

metadata = DefaultParamsReader.loadMetadata(path, self.sc)
data_path = os.path.join(path, "data")
metadata_file_path = os.path.join(data_path, "metadata.json")
Expand All @@ -1502,12 +1540,12 @@ def load(self, path: str) -> "_CumlEstimator":
model_attr_dict = json.loads(model_attr_str)

for key in ["embedding_", "raw_data_"]:
npz_path = model_attr_dict[key]
if npz_path.endswith("csr_.npz"):
model_attr_dict[key] = scipy.sparse.load_npz(npz_path)
df_path = model_attr_dict[key]
if df_path.endswith("csr"):
csr_shape = model_attr_dict.pop(key + "shape")
model_attr_dict[key] = read_sparse_array(df_path, csr_shape)
else:
with np.load(npz_path) as data:
model_attr_dict[key] = data["arr_0"]
model_attr_dict[key] = read_dense_array(df_path)

instance = self.model_cls(**model_attr_dict)
DefaultParamsReader.getAndSetParams(instance, metadata)
Expand Down
Loading

0 comments on commit 767b334

Please sign in to comment.