Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MosaicML-Streaming on Databricks #801

Open
gtmdotme opened this issue Oct 10, 2024 · 9 comments
Open

MosaicML-Streaming on Databricks #801

gtmdotme opened this issue Oct 10, 2024 · 9 comments

Comments

@gtmdotme
Copy link

gtmdotme commented Oct 10, 2024

Hi all, I'm a new user of mosaicml-streaming on Databricks who stumbled upon Mosaic ML (and Petastorm) for loading large data from PySpark to PyTorch tensors. Here is an example jupyter notebook that I'm trying to replicate on my Databricks clusters, however, I have a few questions:

  1. The above notebook's requirements say that we need a "Databricks Runtime for ML 15.2 or higher". However, my organization has an earlier version. Can we use mosaic-streaming on earlier runtime versions?

  2. This above notebook imports "from petastorm import TransformSpec" but this blog says that Petastorm is deprecated and suggests the usage of mosaic-streaming instead. I checked the code, and it just imports the petastorm but doesn't use it. Can someone confirm if this package was mistakenly imported?

  3. My understanding of mosaic-streaming is that it takes a PySpark dataframe as input and provides API for getting a PyTorch dataloader as output which can be used for ML training on the fly without writing the whole data in some MDS format. Is my understanding correct?

PS: I started a discussion on your Slack community but was re-directed to submit an issue here.

@XiaohanZhangCMU
Copy link
Collaborator

Hi @gtmdotme, petastorm is not supported anymore as a DL dataloader on Databricks, my knowledge about pytastorm is also very limited :face_palm:
streaming is not available on earlier DBR versions, so you have to use 15.2 and above. On older DBR, you can manually install via pip install mosaicml-streaming==x.x.x
You understanding in pt-3 is correct. the tutorial is here

Let us know if you need more help.

@gtmdotme
Copy link
Author

gtmdotme commented Oct 21, 2024

Hi @XiaohanZhangCMU, thanks for your answers. I have a follow-up question on the third point. The tutorial you shared seems to indicate that we need to first convert parquet (pyspark dataframe) to MDS format (streaming dataset). So, does that mean that we will consume double storage: one for stored parquet data and another for MDS format data?

@XiaohanZhangCMU
Copy link
Collaborator

@gtmdotme yes, that's correct. You will need to persist two copies.

@gtmdotme
Copy link
Author

gtmdotme commented Oct 22, 2024

We often store data in Parquet format (for PySpark dataframes), but to use your library, we need to also convert it to MDS format. This creates a second copy of the data, which could lead to a significant increase in disk storage usage. Is that a known tradeoff?

In my comparison for a synthetic dataset (example code below), I found that the MDS format is taking nearly 30x larger disk space than the Parquet format. I’m wondering if I'm doing something wrong or if this is expected behavior. This code is tested on the Databricks Runtime 15.4 LTS ML.

Thanks for your help!

Example

import os, shutil
import numpy as np
import pandas as pd
from sklearn.datasets import make_multilabel_classification
from streaming.base.converters import dataframe_to_mds
 
local_dir = './data_loading/'
shutil.rmtree(local_dir, ignore_errors=True)
os.makedirs(local_dir, exist_ok=True)
 
def get_dummy_data(n_samples, n_features, n_labels, seed):
    n_classes = n_labels  ## Max number of labels
    avg_labels_per_class = int(n_classes*0.1)  ## Number of average labels per instance, here 10%
    
    # Generate dummy data
    X, y = make_multilabel_classification(n_samples=n_samples, n_features=n_features, 
                                        n_classes=n_classes, n_labels=avg_labels_per_class, 
                                        random_state=seed)
    
    # Convert to pandas dataframe
    feature_cols = [f'feature_{i}' for i in range(n_features)]
    target_cols = [f'target_{i}' for i in range(n_classes)]
    X = pd.DataFrame(X, columns=feature_cols)
    y = pd.DataFrame(y, columns=target_cols)
 
    # Merge the group ids with the dataset
    data = pd.concat([X, y], axis=1)
 
    return data, feature_cols, target_cols
 
df, feature_cols, target_cols = get_dummy_data(n_samples=100_000, n_features=128, n_labels=32, seed=42)
print(df.shape)
 
# save as parquet, csv and numpy array
df.to_parquet(local_dir+'dummy_data.parquet')
df.to_csv(local_dir+'dummy_data.csv', index=False)
np.save(local_dir+"dummy_numpy.npy", df.to_numpy())
 
# convert pandas df to pyspark df
df_spark = spark.createDataFrame(df)
df_spark.write.mode("overwrite").save(local_dir+"dummy_spark")
 
# save the dataset to an MDS format
mds_kwargs = {'out': out_path}
shutil.rmtree(local_dir, ignore_errors=True)
dataframe_to_mds(df_spark.repartition(4), merge_index=True, mds_kwargs=mds_kwargs)

Output:

$ !du -achd1 {local_dir}

55M ./data_loading/dummy_data.csv 
4.4M ./data_loading/dummy_data.parquet 
123M ./data_loading/dummy_mosaic 
123M ./data_loading/dummy_numpy.npy
5.2M ./data_loading/dummy_spark 
310M ./data_loading/ 
310M total

@XiaohanZhangCMU
Copy link
Collaborator

Hey @gtmdotme Yes the trade-off of having an additional copy is known.

To help reduce the MDS copy size, you may want to add a compression method to mds_kwargs (for details, you can take a look at this page).

parquet compresses data efficiently if you have many repetitive values etc, so I am not surprised they lead to much smaller size since your MDS copy is just a serialized binary format.

Let me know what the size looks like after you applying the compression method.

@gtmdotme
Copy link
Author

Awesome! Compression did help. I used the zstd:9 and it drastically reduced the data size.

Here is the final code:

import os, shutil
import numpy as np
import pandas as pd
from sklearn.datasets import make_multilabel_classification
from streaming.base.converters import dataframe_to_mds
from pyspark.sql.functions import col

local_dir = './data_loading/'
shutil.rmtree(local_dir, ignore_errors=True)
os.makedirs(local_dir, exist_ok=True)

def get_dummy_data(n_samples, n_features, n_labels, seed):
    n_classes = n_labels  ## Max number of labels
    avg_labels_per_class = int(n_classes*0.1)  ## Number of average labels per instance, here 10%
    
    # Generate dummy data
    X, y = make_multilabel_classification(n_samples=n_samples, n_features=n_features, 
                                        n_classes=n_classes, n_labels=avg_labels_per_class, 
                                        random_state=42)
    
    # Convert to pandas dataframe
    feature_cols = [f'feature_{i}' for i in range(n_features)]
    target_cols = [f'target_{i}' for i in range(n_classes)]
    X = pd.DataFrame(X, columns=feature_cols)
    y = pd.DataFrame(y, columns=target_cols)

    # Merge the group ids with the dataset
    data = pd.concat([X, y], axis=1)

    return data, feature_cols, target_cols

df, feature_cols, target_cols = get_dummy_data(n_samples=100_000, n_features=128, n_labels=32, seed=42)
print(df.shape)

# save as parquet, csv and numpy array
df.to_parquet(local_dir+'dummy_data.parquet')
df.to_csv(local_dir+'dummy_data.csv', index=False)
np.save(local_dir+'dummy_numpy.npy', df.to_numpy())

# convert pandas df to pyspark df
df_spark = spark.createDataFrame(df)
df_spark.write.mode("overwrite").save(local_dir+"dummy_spark")

# save the dataset to an MDS format
out_path = os.path.join(local_dir, 'dummy_mosaic')
shutil.rmtree(out_path, ignore_errors=True)
mds_kwargs = {'out': out_path, 'compression': 'zstd:9'}
dataframe_to_mds(df_spark.repartition(4), merge_index=True, mds_kwargs=mds_kwargs)

Output:

!du -achd1 {local_dir}

55M     ./data_loading/dummy_data.csv
4.4M	./data_loading/dummy_data.parquet
6.1M	./data_loading/dummy_mosaic
123M	./data_loading/dummy_numpy.npy
5.2M	./data_loading/dummy_spark
193M	total

@gtmdotme
Copy link
Author

gtmdotme commented Oct 22, 2024

Q1. The DB Runtime 15.4 LTS ML comes with streaming version 0.7.4, however the latest is 0.9.0. I did upgrade to latest version because on the earlier version, I was running into the issue of converting array<int> datatype from PySpark to MDS format (which was fixed at a later release by you @XiaohanZhangCMU in this PR). Was upgrading to the latest version the right approach to fix this issue?

Q2. I have a column of type array<int> in the data and some of it's entries are empty array (i.e., []). While converting to MDS, it throws error:

ValueError: Attempting to encode a numpy array with 0 elements.

I tried to set those empty arrays to null but that also gives me error:

AttributeError: 'NoneType' object has no attribute 'dtype'

Is there a solution to this?

Q3. On this page, I can't find a datatype for boolean values. Is there a recommended way for converting boolean columns from PySpark to MDS format? Here is the error:

ValueError: ArrayType(BooleanType(), True) is not supported by MDSWriter

@XiaohanZhangCMU
Copy link
Collaborator

@gtmdotme Question to Q1 is yes. You will need to upgrade it which has the array encoder.

For Q2, it's not a desired failure but sort of expected. MDS converter cannot do any imputation. It will expect the same dtype for all records and they have valid values.

Q3: you are right, there is no boolean currently. I think a workaround is making it an integer and using "int".

@gtmdotme
Copy link
Author

Thanks so much for the clarification.

I think there are two limitations and subsequently improvements for a future release.

  • One, accommodating null values in the array datatype (which is a common scenario in ML).
  • Two, adding bool datatype in Mosaic ML's datatypes catalog (as using int instead of bool can take as much as 8x space).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants