Skip to content

Commit

Permalink
📖 feat(docs): magnet.ize
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Dec 23, 2023
1 parent 01a7ac4 commit 5e76770
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 23 deletions.
130 changes: 123 additions & 7 deletions magnet/ize/filings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,58 @@
from tqdm import tqdm
from magnet.utils.data_classes import *

"""
The `Processor` class is responsible for loading, processing, and saving data. It provides methods for loading data from different file formats, splitting the data into smaller chunks, and saving the processed data to a file.
Example Usage:
# Initialize the Processor class object
processor = Processor()
# Load data into the processor
processor.load(raw_data, text_column="clean", id_column="id")
# Process and split the loaded data into smaller chunks
await processor.process(path="processed_data.csv", splitter=None, nlp=True)
# Save the processed data to a file
processor.save(filename="processed_data.csv", raw=processor.df)
Main functionalities:
- Loading data from different file formats (csv, json, xlsx, parquet) or a pandas DataFrame
- Splitting the loaded data into smaller chunks either by sentences using natural language processing (NLP) or by a fixed character length
- Saving the processed data to a file in different formats (csv, json, xlsx, parquet)
Methods:
- __init__(): Initializes the Processor class object and sets the df attribute to None and creates an instance of the Utils class.
- save(filename: str = None, raw: pd.DataFrame = None): Saves the pandas DataFrame to a file with the specified filename and file format.
- load(raw: str | pd.DataFrame = None, text_column: str = "clean", id_column: str = 'id'): Loads data into the df attribute of the Processor class.
- process(path: str = None, splitter: any = None, nlp=True): Processes and splits the loaded data into smaller chunks.
- default_splitter(data, window_size=768, overlap=76, nlp=True): Splits the given input data into smaller chunks either by sentences or by a fixed character length.
Fields:
- df: A pandas DataFrame that stores the loaded data.
- utils: An instance of the Utils class that provides utility functions.
"""
class Processor:
def __init__(self):
self.df = None
self.utils = Utils()

def save(self, filename: str = None, raw: pd.DataFrame = None):
"""
Save the pandas DataFrame to a file with the specified filename and file format.
Args:
filename (str): The name of the file to save the data to.
raw (pd.DataFrame): The pandas DataFrame containing the data to be saved.
Raises:
ValueError: If the data format is unsupported or an error occurs during the saving process.
Returns:
None: If an error occurs during the saving process.
str: Success message if the data is successfully saved to the specified file.
"""
try:
file_extension = os.path.splitext(filename)[-1]
file_handlers = {
Expand All @@ -26,6 +72,21 @@ def save(self, filename: str = None, raw: pd.DataFrame = None):
except Exception as e:
_f("fatal", e)
def load(self, raw: str | pd.DataFrame = None, text_column: str = "clean", id_column: str = 'id'):
"""
Load data into the df attribute of the Processor class.
Args:
raw (str | pd.DataFrame): The input data to be loaded. It can be either a file path (str) or a pandas DataFrame.
text_column (str): The name of the column in the input data that contains the text data. Default is "clean".
id_column (str): The name of the column in the input data that contains the unique identifier for each data entry. Default is "id".
Raises:
ValueError: If the file extension is not supported or an exception occurs during the loading process.
Returns:
None: If an error occurs during the loading process.
str: Success message if the data is successfully loaded.
"""
self.id_column = id_column
self.text_column = text_column
try:
Expand All @@ -42,16 +103,30 @@ def load(self, raw: str | pd.DataFrame = None, text_column: str = "clean", id_co
_f("wait", f"loading - {raw_data_dir}")
self.df = file_handlers[file_extension](raw_data_dir)
_f("success", f"loaded - {raw_data_dir}")
return f"Data successfully loaded from {raw_data_dir}"
else:
_f("fatal", "unsupported file type")
raise ValueError("Unsupported file type")
elif isinstance(raw, pd.DataFrame):
self.df = raw
_f("success", f"loaded - {raw}")
return f"Data successfully loaded from DataFrame"
else:
_f("fatal", "data type not in [csv, json, xlsx, parquet, pd.DataFrame]")
raise ValueError("Data type not in [csv, json, xlsx, parquet, pd.DataFrame]")
except Exception as e:
_f("fatal", e)
raise ValueError(str(e))
async def process(self, path: str = None, splitter: any = None, nlp=True):
"""
Process and split the loaded data into smaller chunks.
Args:
path (str): The path to save the processed data.
splitter (function, optional): A custom function to split the text into chunks. If not provided, the default splitter function in the `Processor` class will be used.
nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length.
Returns:
None: If there is no data loaded or if an error occurs during the processing.
The processed data saved to the specified file path.
"""
self.df = self.df.dropna()
if self.df is not None:
try:
Expand Down Expand Up @@ -82,23 +157,64 @@ async def process(self, path: str = None, splitter: any = None, nlp=True):
return _f("fatal", "no data loaded!")

async def create_charge(self, field=None):
"""
Process and send data to a field.
Args:
field (optional): The field object to which the data will be sent.
Returns:
None
Raises:
ValueError: If no data is loaded.
ValueError: If no field is loaded.
Example Usage:
# Initialize the Processor class object
processor = Processor()
# Load data into the processor
processor.load(raw_data, text_column="clean", id_column="id")
# Initialize the field object
field = Field()
# Create charges and send data to the field
await processor.create_charge(field)
"""
if self.df is not None and field is not None:
self.field = field
for i in tqdm(range(len(self.df))):
d = self.df[self.id_column].iloc[i]
c = self.df[self.text_column].iloc[i]
payload = Payload(
document = d
, text = c
document=d,
text=c
)
if self.field:
await self.field.pulse(payload)
else:
_f('fatal', 'no field initialized')
raise ValueError('No field initialized')
else:
return _f("fatal", "no data loaded!") if field else _f('fatal', 'no field loaded!')
if not self.df:
raise ValueError('No data loaded!')
else:
raise ValueError('No field loaded!')

def default_splitter(self, data, window_size=768, overlap=76, nlp=True):
"""
Splits the given input data into smaller chunks either by sentences or by a fixed character length.
Args:
data (str): The input data to be split.
window_size (int, optional): The size of each chunk when splitting by a fixed character length. Defaults to 768.
overlap (int, optional): The number of characters to overlap between each chunk when splitting by a fixed character length. Defaults to 76.
nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length. Defaults to True.
Returns:
list: If `nlp` is True, returns a list of sentences extracted from the input data. If `nlp` is False, returns a list of chunks obtained by splitting the input data into fixed-sized chunks.
"""
if nlp:
self.utils.nlp.max_length = len(data) + 100
sentences = [str(x) for x in self.utils.nlp(data).sents]
Expand Down
125 changes: 109 additions & 16 deletions magnet/ize/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,48 @@
from magnet.utils.data_classes import EmbeddingPayload

class Embedder:
"""
The Embedder class is responsible for embedding text using a pre-trained sentence transformer model and storing or sending the embeddings for further processing. It utilizes the Milvus database for storing and searching the embeddings.
Args:
config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters.
create (bool, optional): If set to True, a connection to the Milvus database will be created. Defaults to False.
Attributes:
config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters.
model (SentenceTransformer): An instance of the SentenceTransformer class from the sentence_transformers library, used for text embedding.
db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database.
Methods:
embed_and_store(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and stores it in the Milvus database.
embed_and_charge(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and sends it to a specified field for further processing.
"""

def __init__(self, config, create=False):
self.config = config
self.model = SentenceTransformer(self.config['MODEL'])
self.db = MilvusDB(self.config)
self.db.on()
if create:
self.db.create()

async def embed_and_store(self, payload, verbose=False, field=None):
"""
Embeds the given payload using a pre-trained sentence transformer model and stores it in a Milvus database.
Args:
payload (object): An object containing the text and document attributes to be embedded and stored.
verbose (bool, optional): A boolean indicating whether additional information should be logged during the embedding process. Defaults to False.
field (object, optional): An object representing the field to which the encoded payload will be sent for further processing.
Returns:
None
Raises:
Exception: If an error occurs during the embedding or storing process.
"""
if field:
self.field = field
try:
Expand All @@ -30,45 +64,83 @@ async def embed_and_store(self, payload, verbose=False, field=None):
self.db.collection.flush(collection_name_array=[self.config['INDEX']])
except Exception as e:
_f('fatal',e)

async def embed_and_charge(self, payload, verbose=False, field=None):
"""
Embeds the given payload using a pre-trained sentence transformer model and sends it to a specified field for further processing.
Args:
payload (object): The payload to be embedded and charged. It should contain the `text` and `document` attributes.
verbose (bool, optional): If set to True, additional information will be logged during the embedding process. Defaults to False.
field (object): The field to which the encoded payload will be sent for further processing.
Raises:
ValueError: If the `field` parameter is not provided.
Returns:
None
Example Usage:
config = {
'MODEL': 'bert-base-nli-mean-tokens',
'INDEX': 'my_index',
'INDEX_PARAMS': {'nprobe': 16}
}
embedder = Embedder(config)
await embedder.embed_and_charge(payload, verbose=True, field=my_field)
"""
if field:
self.field = field
else:
_f('fatal','field is required')
raise ValueError('field is required')
try:
_f('info','embedding payload') if verbose else None
if verbose:
_f('info', 'embedding payload')
payload = EmbeddingPayload(
model = self.config['MODEL']
, embedding = self.model.encode(f"Represent this sentence for searching relevant passages: {payload.text}", normalize_embeddings=True).tolist()
, text = payload.text
, document = payload.document
)
_f('info',f'sending payload\n{payload}') if verbose else None
model=self.config['MODEL'],
embedding=self.model.encode(f"Represent this sentence for searching relevant passages: {payload.text}", normalize_embeddings=True).tolist(),
text=payload.text,
document=payload.document
)
if verbose:
_f('info', f'sending payload\n{payload}')
await self.field.pulse(payload)
except Exception as e:
_f('fatal',e)
_f('fatal', e)

def search(self, payload, limit=100, cb=None):
"""
Search for relevant passages based on a given input payload.
Args:
payload (str): The search query or input payload.
limit (int, optional): The maximum number of results to return. Defaults to 100.
cb (function, optional): A callback function to process the results. Defaults to None.
Returns:
list: A list of dictionaries, each containing the text, document, and distance of a relevant passage.
"""
payload = EmbeddingPayload(
text = payload
, embedding = self.model.encode(f"Represent this sentence for searching relevant passages: {payload}", normalize_embeddings=True)
, model = self.config['MODEL']
text=payload,
embedding=self.model.encode(f"Represent this sentence for searching relevant passages: {payload}", normalize_embeddings=True),
model=self.config['MODEL']
)
self.db.collection.load()
_results = self.db.collection.search(
data=[payload.embedding], # Embeded search value
anns_field="embedding", # Search across embeddings
param=self.config['INDEX_PARAMS'],
limit = limit, # Limit to top_k results per search
limit=limit, # Limit to top_k results per search
output_fields=['text', 'document']
)
results = []
for hits_i, hits in enumerate(_results):
for hit in hits:
results.append({
'text':hit.entity.get('text')
, 'document': hit.entity.get('document')
, 'distance': hit.distance
'text': hit.entity.get('text'),
'document': hit.entity.get('document'),
'distance': hit.distance
})
if cb:
return cb(payload.text, results)
Expand All @@ -79,6 +151,27 @@ def info(self):
def disconnect(self):
return self.db.off()
def delete(self, name=None):
"""
Delete an index from the Milvus database.
Args:
name (str, optional): The name of the index to be deleted. If not provided, no index will be deleted.
Returns:
None
Raises:
Exception: If an error occurs during the deletion process.
Example Usage:
config = {
'MODEL': 'bert-base-nli-mean-tokens',
'INDEX': 'my_index',
'INDEX_PARAMS': {'nprobe': 16}
}
embedder = Embedder(config)
embedder.delete('my_index')
"""
if name and name==self.config['INDEX']:
try:
self.db.delete_index()
Expand Down

0 comments on commit 5e76770

Please sign in to comment.