From 5e76770222966248886b0ec92f76df2669dee6f8 Mon Sep 17 00:00:00 2001 From: DylanAlloy Date: Sat, 23 Dec 2023 08:35:24 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=96=20feat(docs):=20magnet.ize?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- magnet/ize/filings.py | 130 +++++++++++++++++++++++++++++++++++++++--- magnet/ize/memory.py | 125 ++++++++++++++++++++++++++++++++++------ 2 files changed, 232 insertions(+), 23 deletions(-) diff --git a/magnet/ize/filings.py b/magnet/ize/filings.py index bfb2060..f4c7895 100644 --- a/magnet/ize/filings.py +++ b/magnet/ize/filings.py @@ -4,12 +4,58 @@ from tqdm import tqdm from magnet.utils.data_classes import * +""" +The `Processor` class is responsible for loading, processing, and saving data. It provides methods for loading data from different file formats, splitting the data into smaller chunks, and saving the processed data to a file. + +Example Usage: + # Initialize the Processor class object + processor = Processor() + + # Load data into the processor + processor.load(raw_data, text_column="clean", id_column="id") + + # Process and split the loaded data into smaller chunks + await processor.process(path="processed_data.csv", splitter=None, nlp=True) + + # Save the processed data to a file + processor.save(filename="processed_data.csv", raw=processor.df) + +Main functionalities: +- Loading data from different file formats (csv, json, xlsx, parquet) or a pandas DataFrame +- Splitting the loaded data into smaller chunks either by sentences using natural language processing (NLP) or by a fixed character length +- Saving the processed data to a file in different formats (csv, json, xlsx, parquet) + +Methods: +- __init__(): Initializes the Processor class object and sets the df attribute to None and creates an instance of the Utils class. +- save(filename: str = None, raw: pd.DataFrame = None): Saves the pandas DataFrame to a file with the specified filename and file format. +- load(raw: str | pd.DataFrame = None, text_column: str = "clean", id_column: str = 'id'): Loads data into the df attribute of the Processor class. +- process(path: str = None, splitter: any = None, nlp=True): Processes and splits the loaded data into smaller chunks. +- default_splitter(data, window_size=768, overlap=76, nlp=True): Splits the given input data into smaller chunks either by sentences or by a fixed character length. + +Fields: +- df: A pandas DataFrame that stores the loaded data. +- utils: An instance of the Utils class that provides utility functions. +""" class Processor: def __init__(self): self.df = None self.utils = Utils() def save(self, filename: str = None, raw: pd.DataFrame = None): + """ + Save the pandas DataFrame to a file with the specified filename and file format. + + Args: + filename (str): The name of the file to save the data to. + raw (pd.DataFrame): The pandas DataFrame containing the data to be saved. + + Raises: + ValueError: If the data format is unsupported or an error occurs during the saving process. + + Returns: + None: If an error occurs during the saving process. + str: Success message if the data is successfully saved to the specified file. + """ try: file_extension = os.path.splitext(filename)[-1] file_handlers = { @@ -26,6 +72,21 @@ def save(self, filename: str = None, raw: pd.DataFrame = None): except Exception as e: _f("fatal", e) def load(self, raw: str | pd.DataFrame = None, text_column: str = "clean", id_column: str = 'id'): + """ + Load data into the df attribute of the Processor class. + + Args: + raw (str | pd.DataFrame): The input data to be loaded. It can be either a file path (str) or a pandas DataFrame. + text_column (str): The name of the column in the input data that contains the text data. Default is "clean". + id_column (str): The name of the column in the input data that contains the unique identifier for each data entry. Default is "id". + + Raises: + ValueError: If the file extension is not supported or an exception occurs during the loading process. + + Returns: + None: If an error occurs during the loading process. + str: Success message if the data is successfully loaded. + """ self.id_column = id_column self.text_column = text_column try: @@ -42,16 +103,30 @@ def load(self, raw: str | pd.DataFrame = None, text_column: str = "clean", id_co _f("wait", f"loading - {raw_data_dir}") self.df = file_handlers[file_extension](raw_data_dir) _f("success", f"loaded - {raw_data_dir}") + return f"Data successfully loaded from {raw_data_dir}" else: - _f("fatal", "unsupported file type") + raise ValueError("Unsupported file type") elif isinstance(raw, pd.DataFrame): self.df = raw _f("success", f"loaded - {raw}") + return f"Data successfully loaded from DataFrame" else: - _f("fatal", "data type not in [csv, json, xlsx, parquet, pd.DataFrame]") + raise ValueError("Data type not in [csv, json, xlsx, parquet, pd.DataFrame]") except Exception as e: - _f("fatal", e) + raise ValueError(str(e)) async def process(self, path: str = None, splitter: any = None, nlp=True): + """ + Process and split the loaded data into smaller chunks. + + Args: + path (str): The path to save the processed data. + splitter (function, optional): A custom function to split the text into chunks. If not provided, the default splitter function in the `Processor` class will be used. + nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length. + + Returns: + None: If there is no data loaded or if an error occurs during the processing. + The processed data saved to the specified file path. + """ self.df = self.df.dropna() if self.df is not None: try: @@ -82,23 +157,64 @@ async def process(self, path: str = None, splitter: any = None, nlp=True): return _f("fatal", "no data loaded!") async def create_charge(self, field=None): + """ + Process and send data to a field. + + Args: + field (optional): The field object to which the data will be sent. + + Returns: + None + + Raises: + ValueError: If no data is loaded. + ValueError: If no field is loaded. + + Example Usage: + # Initialize the Processor class object + processor = Processor() + + # Load data into the processor + processor.load(raw_data, text_column="clean", id_column="id") + + # Initialize the field object + field = Field() + + # Create charges and send data to the field + await processor.create_charge(field) + """ if self.df is not None and field is not None: self.field = field for i in tqdm(range(len(self.df))): d = self.df[self.id_column].iloc[i] c = self.df[self.text_column].iloc[i] payload = Payload( - document = d - , text = c + document=d, + text=c ) if self.field: await self.field.pulse(payload) else: - _f('fatal', 'no field initialized') + raise ValueError('No field initialized') else: - return _f("fatal", "no data loaded!") if field else _f('fatal', 'no field loaded!') + if not self.df: + raise ValueError('No data loaded!') + else: + raise ValueError('No field loaded!') def default_splitter(self, data, window_size=768, overlap=76, nlp=True): + """ + Splits the given input data into smaller chunks either by sentences or by a fixed character length. + + Args: + data (str): The input data to be split. + window_size (int, optional): The size of each chunk when splitting by a fixed character length. Defaults to 768. + overlap (int, optional): The number of characters to overlap between each chunk when splitting by a fixed character length. Defaults to 76. + nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length. Defaults to True. + + Returns: + list: If `nlp` is True, returns a list of sentences extracted from the input data. If `nlp` is False, returns a list of chunks obtained by splitting the input data into fixed-sized chunks. + """ if nlp: self.utils.nlp.max_length = len(data) + 100 sentences = [str(x) for x in self.utils.nlp(data).sents] diff --git a/magnet/ize/memory.py b/magnet/ize/memory.py index c5be962..0e9a890 100644 --- a/magnet/ize/memory.py +++ b/magnet/ize/memory.py @@ -4,6 +4,24 @@ from magnet.utils.data_classes import EmbeddingPayload class Embedder: + """ + The Embedder class is responsible for embedding text using a pre-trained sentence transformer model and storing or sending the embeddings for further processing. It utilizes the Milvus database for storing and searching the embeddings. + + Args: + config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters. + create (bool, optional): If set to True, a connection to the Milvus database will be created. Defaults to False. + + Attributes: + config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters. + model (SentenceTransformer): An instance of the SentenceTransformer class from the sentence_transformers library, used for text embedding. + db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database. + + Methods: + embed_and_store(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and stores it in the Milvus database. + embed_and_charge(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and sends it to a specified field for further processing. + + """ + def __init__(self, config, create=False): self.config = config self.model = SentenceTransformer(self.config['MODEL']) @@ -11,7 +29,23 @@ def __init__(self, config, create=False): self.db.on() if create: self.db.create() + async def embed_and_store(self, payload, verbose=False, field=None): + """ + Embeds the given payload using a pre-trained sentence transformer model and stores it in a Milvus database. + + Args: + payload (object): An object containing the text and document attributes to be embedded and stored. + verbose (bool, optional): A boolean indicating whether additional information should be logged during the embedding process. Defaults to False. + field (object, optional): An object representing the field to which the encoded payload will be sent for further processing. + + Returns: + None + + Raises: + Exception: If an error occurs during the embedding or storing process. + + """ if field: self.field = field try: @@ -30,45 +64,83 @@ async def embed_and_store(self, payload, verbose=False, field=None): self.db.collection.flush(collection_name_array=[self.config['INDEX']]) except Exception as e: _f('fatal',e) + async def embed_and_charge(self, payload, verbose=False, field=None): + """ + Embeds the given payload using a pre-trained sentence transformer model and sends it to a specified field for further processing. + + Args: + payload (object): The payload to be embedded and charged. It should contain the `text` and `document` attributes. + verbose (bool, optional): If set to True, additional information will be logged during the embedding process. Defaults to False. + field (object): The field to which the encoded payload will be sent for further processing. + + Raises: + ValueError: If the `field` parameter is not provided. + + Returns: + None + + Example Usage: + config = { + 'MODEL': 'bert-base-nli-mean-tokens', + 'INDEX': 'my_index', + 'INDEX_PARAMS': {'nprobe': 16} + } + embedder = Embedder(config) + await embedder.embed_and_charge(payload, verbose=True, field=my_field) + + """ if field: self.field = field else: - _f('fatal','field is required') + raise ValueError('field is required') try: - _f('info','embedding payload') if verbose else None + if verbose: + _f('info', 'embedding payload') payload = EmbeddingPayload( - model = self.config['MODEL'] - , embedding = self.model.encode(f"Represent this sentence for searching relevant passages: {payload.text}", normalize_embeddings=True).tolist() - , text = payload.text - , document = payload.document - ) - _f('info',f'sending payload\n{payload}') if verbose else None + model=self.config['MODEL'], + embedding=self.model.encode(f"Represent this sentence for searching relevant passages: {payload.text}", normalize_embeddings=True).tolist(), + text=payload.text, + document=payload.document + ) + if verbose: + _f('info', f'sending payload\n{payload}') await self.field.pulse(payload) except Exception as e: - _f('fatal',e) + _f('fatal', e) def search(self, payload, limit=100, cb=None): + """ + Search for relevant passages based on a given input payload. + + Args: + payload (str): The search query or input payload. + limit (int, optional): The maximum number of results to return. Defaults to 100. + cb (function, optional): A callback function to process the results. Defaults to None. + + Returns: + list: A list of dictionaries, each containing the text, document, and distance of a relevant passage. + """ payload = EmbeddingPayload( - text = payload - , embedding = self.model.encode(f"Represent this sentence for searching relevant passages: {payload}", normalize_embeddings=True) - , model = self.config['MODEL'] + text=payload, + embedding=self.model.encode(f"Represent this sentence for searching relevant passages: {payload}", normalize_embeddings=True), + model=self.config['MODEL'] ) self.db.collection.load() _results = self.db.collection.search( data=[payload.embedding], # Embeded search value anns_field="embedding", # Search across embeddings param=self.config['INDEX_PARAMS'], - limit = limit, # Limit to top_k results per search + limit=limit, # Limit to top_k results per search output_fields=['text', 'document'] ) results = [] for hits_i, hits in enumerate(_results): for hit in hits: results.append({ - 'text':hit.entity.get('text') - , 'document': hit.entity.get('document') - , 'distance': hit.distance + 'text': hit.entity.get('text'), + 'document': hit.entity.get('document'), + 'distance': hit.distance }) if cb: return cb(payload.text, results) @@ -79,6 +151,27 @@ def info(self): def disconnect(self): return self.db.off() def delete(self, name=None): + """ + Delete an index from the Milvus database. + + Args: + name (str, optional): The name of the index to be deleted. If not provided, no index will be deleted. + + Returns: + None + + Raises: + Exception: If an error occurs during the deletion process. + + Example Usage: + config = { + 'MODEL': 'bert-base-nli-mean-tokens', + 'INDEX': 'my_index', + 'INDEX_PARAMS': {'nprobe': 16} + } + embedder = Embedder(config) + embedder.delete('my_index') + """ if name and name==self.config['INDEX']: try: self.db.delete_index()