Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-merge] branch-24.12 to branch-25.02 [skip ci] [bot] #814

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
549 changes: 152 additions & 397 deletions notebooks/umap.ipynb

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,9 +983,7 @@ def _get_cuml_transform_func(
_TransformFunc,
Optional[_EvaluateFunc],
]:
raise NotImplementedError(
"DBSCAN does not can not have a separate transform UDF"
)
raise NotImplementedError("DBSCAN does not have a separate transform UDF")

def _transform(self, dataset: DataFrame) -> DataFrame:
logger = get_logger(self.__class__)
Expand Down
128 changes: 83 additions & 45 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import pandas as pd
import pyspark
import scipy
from numpy.typing import ArrayLike
from pandas import DataFrame as PandasDataFrame
from pyspark.ml.param.shared import (
HasFeaturesCol,
Expand All @@ -47,6 +48,7 @@
from pyspark.ml.util import DefaultParamsReader, DefaultParamsWriter, MLReader, MLWriter
from pyspark.sql import Column, DataFrame
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import (
ArrayType,
DoubleType,
Expand Down Expand Up @@ -160,7 +162,6 @@ def __init__(self) -> None:
transform_queue_size=4.0,
a=None,
b=None,
precomputed_knn=None,
random_state=None,
build_algo="auto",
build_kwds=None,
Expand Down Expand Up @@ -335,18 +336,6 @@ def __init__(self) -> None:
typeConverter=TypeConverters.toFloat,
)

precomputed_knn = Param(
Params._dummy(),
"precomputed_knn",
(
f"Either one of a tuple (indices, distances) of arrays of shape (n_samples, n_neighbors), a pairwise distances"
f" dense array of shape (n_samples, n_samples) or a KNN graph sparse array (preferably CSR/COO). This feature"
f" allows the precomputation of the KNN outside of UMAP and also allows the use of a custom distance function."
f" This function should match the metric used to train the UMAP embeedings."
),
typeConverter=TypeConverters.toListListFloat,
)

random_state = Param(
Params._dummy(),
"random_state",
Expand Down Expand Up @@ -586,18 +575,6 @@ def setB(self: P, value: float) -> P:
"""
return self._set_params(b=value)

def getPrecomputedKNN(self: P) -> List[List[float]]:
"""
Gets the value of `precomputed_knn`.
"""
return self.getOrDefault("precomputed_knn")

def setPrecomputedKNN(self: P, value: List[List[float]]) -> P:
"""
Sets the value of `precomputed_knn`.
"""
return self._set_params(precomputed_knn=value)

def getRandomState(self: P) -> int:
"""
Gets the value of `random_state`.
Expand Down Expand Up @@ -841,7 +818,7 @@ class UMAP(UMAPClass, _CumlEstimatorSupervised, _UMAPCumlParams):

>>> X, _ = make_blobs(500, 5, centers=42, cluster_std=0.1, dtype=np.float32, random_state=10)
>>> feature_cols = [f"c{i}" for i in range(X.shape[1])]
>>> schema = [f"{c} {"float"}" for c in feature_cols]
>>> schema = [f"{c} {'float'}" for c in feature_cols]
>>> df = spark.createDataFrame(X.tolist(), ",".join(schema))
>>> df = df.withColumn("features", array(*feature_cols)).drop(*feature_cols)
>>> df.show(10, False)
Expand Down Expand Up @@ -904,7 +881,7 @@ def __init__(
transform_queue_size: Optional[float] = 1.0,
a: Optional[float] = None,
b: Optional[float] = None,
precomputed_knn: Optional[List[List[float]]] = None,
precomputed_knn: Optional[Union[ArrayLike, Tuple[ArrayLike, ArrayLike]]] = None,
random_state: Optional[int] = None,
build_algo: Optional[str] = "auto",
build_kwds: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -948,6 +925,9 @@ def _fit(self, dataset: DataFrame) -> "UMAPModel":
# Force to single partition, single worker
self._num_workers = 1
if data_subset.rdd.getNumPartitions() != 1:
get_logger(self.__class__).info(
"Coalescing input data(sub)set to one partition for fit()."
)
data_subset = data_subset.coalesce(1)

df_output = self._call_cuml_fit_func_dataframe(
Expand Down Expand Up @@ -992,6 +972,8 @@ def _fit(self, dataset: DataFrame) -> "UMAPModel":

model._num_workers = input_num_workers

# We don't need the precomputed knn anymore
self.cuml_params["precomputed_knn"] = None
self._copyValues(model)
self._copy_cuml_params(model) # type: ignore

Expand Down Expand Up @@ -1441,19 +1423,45 @@ def _out_schema(self, input_schema: StructType) -> Union[StructType, str]:
return f"array<{pyspark_type}>"

def write(self) -> MLWriter:
return _CumlModelWriterNumpy(self)
return _CumlModelWriterParquet(self)

@classmethod
def read(cls) -> MLReader:
return _CumlModelReaderNumpy(cls)
return _CumlModelReaderParquet(cls)


class _CumlModelWriterNumpy(_CumlModelWriter):
class _CumlModelWriterParquet(_CumlModelWriter):
"""
Override parent writer to save numpy objects of _CumlModel to the file
Override parent writer to write _CumlModel array attributes to Spark Dataframe as parquet
"""

def saveImpl(self, path: str) -> None:

spark = _get_spark_session()

def write_sparse_array(array: scipy.sparse.spmatrix, df_dir: str) -> None:
indptr_df = spark.createDataFrame(array.indptr, schema=["indptr"])
indices_data_df = spark.createDataFrame(
pd.DataFrame(
{
"indices": array.indices,
"data": array.data,
"row_id": range(len(array.indices)),
}
)
)

indptr_df.write.parquet(
os.path.join(df_dir, "indptr.parquet"), mode="overwrite"
)
indices_data_df.write.parquet(
os.path.join(df_dir, "indices_data.parquet"), mode="overwrite"
)

def write_dense_array(array: np.ndarray, df_path: str) -> None:
data_df = spark.createDataFrame(array)
data_df.write.parquet(df_path, mode="overwrite")

DefaultParamsWriter.saveMetadata(
self.instance,
path,
Expand All @@ -1464,22 +1472,25 @@ def saveImpl(self, path: str) -> None:
"_float32_inputs": self.instance._float32_inputs,
},
)
data_path = os.path.join(path, "data")

model_attributes = self.instance._get_model_attributes()
assert model_attributes is not None

data_path = os.path.join(path, "data")
if not os.path.exists(data_path):
os.makedirs(data_path)
assert model_attributes is not None

for key in ["embedding_", "raw_data_"]:
array = model_attributes[key]
if isinstance(array, scipy.sparse.csr_matrix):
npz_path = os.path.join(data_path, f"{key}csr_.npz")
scipy.sparse.save_npz(npz_path, array)
df_dir = os.path.join(data_path, f"{key}csr")
write_sparse_array(array, df_dir)
model_attributes[key] = df_dir
model_attributes[key + "shape"] = array.shape
else:
npz_path = os.path.join(data_path, f"{key}.npz")
np.savez_compressed(npz_path, array)
model_attributes[key] = npz_path
df_path = os.path.join(data_path, f"{key}.parquet")
write_dense_array(array, df_path)
model_attributes[key] = df_path

metadata_file_path = os.path.join(data_path, "metadata.json")
model_attributes_str = json.dumps(model_attributes)
Expand All @@ -1488,12 +1499,39 @@ def saveImpl(self, path: str) -> None:
)


class _CumlModelReaderNumpy(_CumlModelReader):
class _CumlModelReaderParquet(_CumlModelReader):
"""
Override parent reader to instantiate numpy objects of _CumlModel from file
Override parent reader to instantiate _CumlModel array attributes from Spark DataFrame parquet files
"""

def load(self, path: str) -> "_CumlEstimator":

spark = _get_spark_session()

def read_sparse_array(
df_dir: str, csr_shape: Tuple[int, int]
) -> scipy.sparse.csr_matrix:
indptr_df = spark.read.parquet(
os.path.join(df_dir, "indptr.parquet")
).orderBy("indptr")
indices_data_df = spark.read.parquet(
os.path.join(df_dir, "indices_data.parquet")
).orderBy("row_id")

indptr = np.squeeze(np.array(indptr_df.collect(), dtype=np.int32))
indices = np.squeeze(
np.array(indices_data_df.select("indices").collect(), dtype=np.int32)
)
data = np.squeeze(
np.array(indices_data_df.select("data").collect(), dtype=np.float32)
)

return scipy.sparse.csr_matrix((data, indices, indptr), shape=csr_shape)

def read_dense_array(df_path: str) -> np.ndarray:
data_df = spark.read.parquet(df_path)
return np.array(data_df.collect(), dtype=np.float32)

metadata = DefaultParamsReader.loadMetadata(path, self.sc)
data_path = os.path.join(path, "data")
metadata_file_path = os.path.join(data_path, "metadata.json")
Expand All @@ -1502,12 +1540,12 @@ def load(self, path: str) -> "_CumlEstimator":
model_attr_dict = json.loads(model_attr_str)

for key in ["embedding_", "raw_data_"]:
npz_path = model_attr_dict[key]
if npz_path.endswith("csr_.npz"):
model_attr_dict[key] = scipy.sparse.load_npz(npz_path)
df_path = model_attr_dict[key]
if df_path.endswith("csr"):
csr_shape = model_attr_dict.pop(key + "shape")
model_attr_dict[key] = read_sparse_array(df_path, csr_shape)
else:
with np.load(npz_path) as data:
model_attr_dict[key] = data["arr_0"]
model_attr_dict[key] = read_dense_array(df_path)

instance = self.model_cls(**model_attr_dict)
DefaultParamsReader.getAndSetParams(instance, metadata)
Expand Down
Loading
Loading