diff --git a/.buildinfo b/.buildinfo new file mode 100644 index 0000000..5672f25 --- /dev/null +++ b/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: ced2ed8930753438f7bfc8caa2ddd2f8 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md new file mode 100644 index 0000000..e0c7da8 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +#GitHub Pages + +Last update of sphinx html documentation from [c0f2d9b](https://github.com/Prismadic/magnet/tree/c0f2d9be2b9b6268409282b738fda833f159fa11) diff --git a/_modules/index.html b/_modules/index.html new file mode 100644 index 0000000..275e6a1 --- /dev/null +++ b/_modules/index.html @@ -0,0 +1,271 @@ + + +
+ + +
+import nats, json
+from magnet.utils.globals import _f
+from dataclasses import asdict
+from nats.errors import TimeoutError
+from magnet.utils.data_classes import *
+from nats.js.api import StreamConfig
+import xxhash
+
+x = xxhash
+
+
+[docs]
+class Charge:
+ """
+ The `Charge` class is responsible for connecting to a NATS server, managing streams and categories, and publishing data to the server.
+
+ Args:
+ server (str): The NATS server URL.
+
+ Attributes:
+ server (str): The NATS server URL.
+ category (str): The current category.
+ stream (str): The current stream.
+ nc: The NATS connection object.
+ js: The JetStream API object.
+ """
+
+ def __init__(self, server):
+ self.server = server
+
+
+[docs]
+ async def on(self, category: str = 'no_category', stream: str = 'documents', create: bool = False):
+ """
+ Connects to the NATS server, creates a stream and category if they don't exist, and prints a success message.
+
+ Args:
+ category (str, optional): The category to connect to. Defaults to 'no_category'.
+ stream (str, optional): The stream to connect to. Defaults to 'documents'.
+ create (bool, optional): Whether to create the stream and category if they don't exist. Defaults to False.
+ """
+ self.category = category
+ self.stream = stream
+ try:
+ nc = await nats.connect(f'nats://{self.server}:4222')
+ self.nc = nc
+ self.js = self.nc.jetstream()
+ self.js.purge_stream
+ streams = await self.js.streams_info()
+ if self.stream not in [x.config.name for x in streams] or self.category not in sum([x.config.subjects for x in streams], []):
+ try:
+ if self.stream not in [x.config.name for x in streams]:
+ _f("wait", f'creating {self.stream}') \
+ , await self.js.add_stream(name=self.stream, subjects=[self.category]) \
+ if create else _f("warn", f"couldn't create {stream} on {self.server}")
+ streams = await self.js.streams_info()
+ if self.category not in sum([x.config.subjects for x in streams if x.config.name == self.stream], []):
+ subjects = sum([x.config.subjects for x in streams if x.config.name == self.stream], [])
+ subjects.append(self.category)
+ await self.js.update_stream(StreamConfig(
+ name = self.stream
+ , subjects = subjects
+ ))
+ _f("success", f'created [{self.category}] on\n🛰️ stream: {self.stream}')
+ except Exception as e:
+ _f('fatal', f"couldn't create {stream} on {self.server}\n{e}")
+ except TimeoutError:
+ _f('fatal', f'could not connect to {self.server}')
+ _f("success", f'connected to [{self.category}] on\n🛰️ stream: {self.stream}')
+
+
+
+[docs]
+ async def off(self):
+ """
+ Disconnects from the NATS server and prints a warning message.
+ """
+ await self.nc.drain()
+ _f('warn', f'disconnected from {self.server}')
+
+
+
+[docs]
+ async def pulse(self, payload):
+ """
+ Publishes data to the NATS server using the specified category and payload.
+
+ Args:
+ payload (dict): The data to be published.
+ """
+ try:
+ bytes_ = json.dumps(asdict(payload), separators=(', ', ':')).encode('utf-8')
+ except Exception as e:
+ _f('fatal', f'invalid JSON\n{e}')
+ try:
+ _hash = x.xxh64(bytes_).hexdigest()
+ await self.js.publish(
+ self.category
+ , bytes_
+ , headers={
+ "Nats-Msg-Id": _hash
+ }
+ )
+ except Exception as e:
+ _f('fatal', f'could not send data to {self.server}\n{e}')
+
+
+[docs]
+ async def excite(self, job: dict = {}):
+ """
+ Publishes data to the NATS server using the specified category and payload.
+
+ Args:
+ job (dict, optional): The data to be published. Defaults to {}.
+ """
+ try:
+ bytes_ = json.dumps(job, separators=(', ', ':')).encode('utf-8')
+ except Exception as e:
+ _f('fatal', f'invalid JSON\n{e}')
+ try:
+ _hash = x.xxh64(bytes_).hexdigest()
+ await self.js.publish(
+ self.category
+ , bytes_
+ , headers={
+ "Nats-Msg-Id": _hash
+ }
+ )
+ except Exception as e:
+ _f('fatal', f'could not send data to {self.server}\n{e}')
+
+
+
+
+[docs]
+ async def emp(self, name=None):
+ """
+ Deletes the specified stream if the name matches the current stream, or prints an error message if the name doesn't match or the stream doesn't exist.
+
+ Args:
+ name (str, optional): The name of the stream to delete. Defaults to None.
+ """
+ if name and name==self.stream:
+ await self.js.delete_stream(name=self.stream)
+ _f('warn', f'{self.stream} stream deleted')
+ else:
+ _f('fatal', "name doesn't match the stream or stream doesn't exist")
+
+
+
+[docs]
+ async def reset(self, name=None):
+ """
+ Purges the specified category if the name matches the current category, or prints an error message if the name doesn't match or the category doesn't exist.
+
+ Args:
+ name (str, optional): The name of the category to purge. Defaults to None.
+ """
+ if name and name==self.category:
+ await self.js.purge_stream(name=self.stream, subject=self.category)
+ _f('warn', f'{self.category} category deleted')
+ else:
+ _f('fatal', "name doesn't match the stream category or category doesn't exist")
+
+
+
+
+[docs]
+class Resonator:
+ """
+ The `Resonator` class is responsible for connecting to a NATS server, subscribing to a specific category in a stream, and consuming messages from that category.
+
+ Args:
+ server (str): The address of the NATS server.
+
+ Attributes:
+ server (str): The address of the NATS server.
+ category (str): The category to subscribe to.
+ stream (str): The stream to subscribe to.
+ session (str): The session name for durable subscriptions.
+ nc (nats.aio.client.Client): The NATS client.
+ js (nats.aio.client.JetStream): The JetStream client.
+ sub (nats.aio.client.Subscription): The subscription.
+
+ Methods:
+ __init__(self, server: str)
+ on(self, category: str = 'no_category', stream: str = 'documents', cb=print, session='magnet') -> None
+ info(self, session: str = None) -> None
+ off(self) -> None
+ """
+
+ def __init__(self, server: str):
+ """
+ Initializes the `Resonator` class with the NATS server address.
+
+ Args:
+ server (str): The address of the NATS server.
+ """
+ self.server = server
+
+
+[docs]
+ async def on(self, category: str = 'no_category', stream: str = 'documents', session='magnet'):
+ """
+ Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
+
+ Args:
+ category (str, optional): The category to subscribe to. Defaults to 'no_category'.
+ stream (str, optional): The stream to subscribe to. Defaults to 'documents'.
+ session (str, optional): The session name for durable subscriptions. Defaults to 'magnet'.
+
+ Returns:
+ None
+
+ Raises:
+ TimeoutError: If there is a timeout error while connecting to the NATS server.
+ Exception: If there is an error in consuming the message or processing the callback function.
+ """
+ self.category = category
+ self.stream = stream
+ self.session = session
+ _f('wait',f'connecting to {self.server}')
+ try:
+ self.nc = await nats.connect(f'nats://{self.server}:4222')
+ self.js = self.nc.jetstream()
+ self.sub = await self.js.subscribe(self.category, durable=self.session)
+ _f("success", f'connected to {self.server}')
+ except TimeoutError:
+ _f("fatal", f'could not connect to {self.server}')
+
+
+[docs]
+ async def listen(self, cb=print):
+ """
+ Consume messages from a specific category in a stream.
+
+ Args:
+ cb (function, optional): The callback function to process the received messages. Defaults to `print`.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If there is an error in consuming the message or processing the callback function.
+ """
+ _f("info", f'consuming delta from [{self.category}] on\n🛰️ stream: {self.stream}\n🧲 session: "{self.session}"')
+ while True:
+ try:
+ msg = await self.sub.next_msg(timeout=60)
+ payload = Payload(**json.loads(msg.data))
+ try:
+ await cb(payload, msg)
+ except Exception as e:
+ _f("warn", f'retrying connection to {self.server}')
+ except Exception as e:
+ _f('fatal','invalid JSON')
+
+
+
+[docs]
+ async def worker(self, cb=print):
+ """
+ Consume messages from a specific category in a stream and process them as jobs.
+
+ Args:
+ cb (function, optional): The callback function to process the received messages. Defaults to `print`.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If there is an error in consuming the message or processing the callback function.
+ """
+ _f("info", f'processing jobs from [{self.category}] on\n🛰️ stream: {self.stream}\n🧲 session: "{self.session}"')
+ try:
+ msg = await self.sub.next_msg(timeout=60)
+ payload = JobParams(**json.loads(msg.data))
+ try:
+ await cb(payload, msg)
+ except Exception as e:
+ _f("warn", f'something wrong in your callback function!\n{e}')
+ except Exception as e:
+ _f('fatal','invalid JSON')
+
+
+
+[docs]
+ async def info(self, session: str = None):
+ """
+ Retrieves information about a consumer in a JetStream stream.
+
+ :param session: A string representing the session name of the consumer. If not provided, information about all consumers in the stream will be retrieved.
+ :return: None
+ """
+ jsm = await self.js.consumer_info(stream=self.stream, consumer=session)
+ _f('info',json.dumps(jsm.config.__dict__, indent=2))
+
+
+[docs]
+ async def off(self):
+ """
+ Unsubscribes from the category and stream and disconnects from the NATS server.
+
+ :return: None
+ """
+ await self.sub.unsubscribe()
+ _f('warn', f'unsubscribed from {self.stream}')
+ await self.nc.drain()
+ _f('warn', f'disconnected from {self.server}')
+
+
+
+import pandas as pd
+import os
+from magnet.utils.globals import _f, Utils
+from tqdm import tqdm
+from magnet.utils.data_classes import *
+
+
+[docs]
+class Processor:
+ """
+ The `Processor` class is responsible for loading, processing, and saving data. It provides methods for loading data from different file formats, splitting the data into smaller chunks, and saving the processed data to a file.
+
+ Example Usage:
+ # Initialize the Processor class object
+ processor = Processor()
+
+ # Load data into the processor
+ processor.load(raw_data, text_column="clean", id_column="id")
+
+ # Process and split the loaded data into smaller chunks
+ await processor.process(path="processed_data.csv", splitter=None, nlp=True)
+
+ # Save the processed data to a file
+ processor.save(filename="processed_data.csv", raw=processor.df)
+
+ Main functionalities:
+ - Loading data from different file formats (csv, json, xlsx, parquet) or a pandas DataFrame
+ - Splitting the loaded data into smaller chunks either by sentences using natural language processing (NLP) or by a fixed character length
+ - Saving the processed data to a file in different formats (csv, json, xlsx, parquet)
+
+ Methods:
+ - __init__(): Initializes the Processor class object and sets the df attribute to None and creates an instance of the Utils class.
+ - save(filename: str = None, raw: pd.DataFrame = None): Saves the pandas DataFrame to a file with the specified filename and file format.
+ - load(raw: str | pd.DataFrame = None, text_column: str = "clean", id_column: str = 'id'): Loads data into the df attribute of the Processor class.
+ - process(path: str = None, splitter: any = None, nlp=True): Processes and splits the loaded data into smaller chunks.
+ - default_splitter(data, window_size=768, overlap=76, nlp=True): Splits the given input data into smaller chunks either by sentences or by a fixed character length.
+
+ Fields:
+ - df: A pandas DataFrame that stores the loaded data.
+ - utils: An instance of the Utils class that provides utility functions.
+ """
+ def __init__(self):
+ self.df = None
+ self.utils = Utils()
+
+
+[docs]
+ def save(self, filename: str = None, raw: pd.DataFrame = None):
+ """
+ Save the pandas DataFrame to a file with the specified filename and file format.
+
+ Args:
+ filename (str): The name of the file to save the data to.
+ raw (pd.DataFrame): The pandas DataFrame containing the data to be saved.
+
+ Raises:
+ ValueError: If the data format is unsupported or an error occurs during the saving process.
+
+ Returns:
+ None: If an error occurs during the saving process.
+ str: Success message if the data is successfully saved to the specified file.
+ """
+ try:
+ file_extension = os.path.splitext(filename)[-1]
+ file_handlers = {
+ ".csv": raw.to_csv,
+ ".json": raw.to_json,
+ ".xlsx": raw.to_excel,
+ ".parquet": raw.to_parquet,
+ }
+ if file_extension in file_handlers:
+ file_handlers[file_extension](filename)
+ _f("success", f"saved - {filename}")
+ else:
+ _f("fatal", "unsupported data")
+ except Exception as e:
+ _f("fatal", e)
+
+
+[docs]
+ def load(self, raw: str | pd.DataFrame = None, text_column: str = "clean", id_column: str = 'id'):
+ """
+ Load data into the df attribute of the Processor class.
+
+ Args:
+ raw (str | pd.DataFrame): The input data to be loaded. It can be either a file path (str) or a pandas DataFrame.
+ text_column (str): The name of the column in the input data that contains the text data. Default is "clean".
+ id_column (str): The name of the column in the input data that contains the unique identifier for each data entry. Default is "id".
+
+ Raises:
+ ValueError: If the file extension is not supported or an exception occurs during the loading process.
+
+ Returns:
+ None: If an error occurs during the loading process.
+ str: Success message if the data is successfully loaded.
+ """
+ self.id_column = id_column
+ self.text_column = text_column
+ try:
+ if isinstance(raw, str):
+ raw_data_dir = raw
+ file_extension = os.path.splitext(raw)[-1]
+ file_handlers = {
+ ".csv": pd.read_csv,
+ ".json": pd.read_json,
+ ".xlsx": pd.read_excel,
+ ".parquet": pd.read_parquet,
+ }
+ if file_extension in file_handlers:
+ _f("wait", f"loading - {raw_data_dir}")
+ self.df = file_handlers[file_extension](raw_data_dir)
+ _f("success", f"loaded - {raw_data_dir}")
+ return _f('success',f"data successfully loaded from {raw_data_dir}")
+ else:
+ raise ValueError("Unsupported file type")
+ elif isinstance(raw, pd.DataFrame):
+ self.df = raw
+ _f("success", f"loaded - {raw}")
+ return _f('success',f"data successfully loaded from DataFrame")
+ else:
+ raise ValueError("Data type not in [csv, json, xlsx, parquet, pd.DataFrame]")
+ except Exception as e:
+ raise ValueError(str(e))
+
+
+[docs]
+ async def process(self, path: str = None, splitter: any = None, nlp=True):
+ """
+ Process and split the loaded data into smaller chunks.
+
+ Args:
+ path (str): The path to save the processed data.
+ splitter (function, optional): A custom function to split the text into chunks. If not provided, the default splitter function in the `Processor` class will be used.
+ nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length.
+
+ Returns:
+ None: If there is no data loaded or if an error occurs during the processing.
+ The processed data saved to the specified file path.
+ """
+ self.df = self.df.dropna()
+ if self.df is not None:
+ try:
+ _f("wait", f"get coffee or tea - {len(self.df)} processing...")
+ sentence_splitter = self.default_splitter if splitter is None else splitter
+ chunks = []
+ knowledge_base = pd.DataFrame()
+ tqdm.pandas()
+ self.df["chunks"] = self.df[self.text_column].progress_apply(
+ lambda x: [
+ str(s) for s in sentence_splitter(self.utils.normalize_text(x), nlp=nlp)
+ ]
+ )
+ for i in tqdm(range(len(self.df))):
+ for c in self.df['chunks'].iloc[i]:
+ d = self.df[self.id_column].iloc[i]
+ chunks.append((d, c))
+ knowledge_base['id'] = [c[0] for c in chunks]
+ knowledge_base['chunks'] = [c[1] for c in chunks]
+
+ self.df = knowledge_base
+ _f('wait', f'saving to {path}')
+ self.save(path, self.df)
+ return
+ except Exception as e:
+ _f("fatal", e)
+ else:
+ return _f("fatal", "no data loaded!")
+
+
+
+[docs]
+ async def create_charge(self, field=None):
+ """
+ Process and send data to a field.
+
+ Args:
+ field (optional): The field object to which the data will be sent.
+
+ Returns:
+ None
+
+ Raises:
+ ValueError: If no data is loaded.
+ ValueError: If no field is loaded.
+
+ Example Usage:
+ # Initialize the Processor class object
+ processor = Processor()
+
+ # Load data into the processor
+ processor.load(raw_data, text_column="clean", id_column="id")
+
+ # Initialize the field object
+ field = Field()
+
+ # Create charges and send data to the field
+ await processor.create_charge(field)
+ """
+ if self.df is not None and field is not None:
+ self.field = field
+ for i in tqdm(range(len(self.df))):
+ d = self.df[self.id_column].iloc[i]
+ c = self.df[self.text_column].iloc[i]
+ payload = Payload(
+ document=d,
+ text=c
+ )
+ if self.field:
+ await self.field.pulse(payload)
+ else:
+ raise ValueError('No field initialized')
+ else:
+ if not self.df:
+ raise ValueError('No data loaded!')
+ else:
+ raise ValueError('No field loaded!')
+
+
+
+[docs]
+ def default_splitter(self, data, window_size=768, overlap=76, nlp=True):
+ """
+ Splits the given input data into smaller chunks either by sentences or by a fixed character length.
+
+ Args:
+ data (str): The input data to be split.
+ window_size (int, optional): The size of each chunk when splitting by a fixed character length. Defaults to 768.
+ overlap (int, optional): The number of characters to overlap between each chunk when splitting by a fixed character length. Defaults to 76.
+ nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length. Defaults to True.
+
+ Returns:
+ list: If `nlp` is True, returns a list of sentences extracted from the input data. If `nlp` is False, returns a list of chunks obtained by splitting the input data into fixed-sized chunks.
+ """
+ if nlp:
+ self.utils.nlp.max_length = len(data) + 100
+ sentences = [str(x) for x in self.utils.nlp(data).sents]
+ return sentences
+ else:
+ # Perform chunked splitting by a fixed character length
+ chunks = []
+ start_char_idx = 0
+ while start_char_idx < len(data):
+ end_char_idx = start_char_idx + window_size
+ chunk = data[start_char_idx:end_char_idx]
+ chunks.append(chunk)
+ start_char_idx += (window_size - overlap)
+ return chunks
+
+
+
+from sentence_transformers import SentenceTransformer
+from magnet.utils.globals import _f
+from magnet.utils.milvus import *
+from magnet.utils.data_classes import EmbeddingPayload
+
+
+[docs]
+class Embedder:
+ """
+ The Embedder class is responsible for embedding text using a pre-trained sentence transformer model and storing or sending the embeddings for further processing. It utilizes the Milvus database for storing and searching the embeddings.
+
+ Args:
+ config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters.
+ create (bool, optional): If set to True, a connection to the Milvus database will be created. Defaults to False.
+
+ Attributes:
+ config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters.
+ model (SentenceTransformer): An instance of the SentenceTransformer class from the sentence_transformers library, used for text embedding.
+ db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database.
+
+ Methods:
+ embed_and_store(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and stores it in the Milvus database.
+ embed_and_charge(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and sends it to a specified field for further processing.
+
+ """
+
+ def __init__(self, config, create=False, initialize=False):
+ self.config = config
+ self.model = SentenceTransformer(self.config['MODEL'])
+ self.db = MilvusDB(self.config)
+ self.db.on()
+ if create:
+ self.db.create(overwrite=True)
+ self.db.load()
+ if initialize:
+ self.db.initialize()
+ return
+ self.db.load()
+
+[docs]
+ async def index(self, payload, msg, verbose=False, field=None, charge=False, instruction: str = "Represent this sentence for searching relevant passages: "):
+ """
+ Embeds the given payload using a pre-trained sentence transformer model and stores it in a Milvus database.
+
+ Args:
+ payload (object): An object containing the text and document attributes to be embedded and stored.
+ verbose (bool, optional): A boolean indicating whether additional information should be logged during the embedding process. Defaults to False.
+ field (object, optional): An object representing the field to which the encoded payload will be sent for further processing.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If an error occurs during the embedding or storing process.
+
+ """
+ if not msg or not payload:
+ return _f('fatal', 'no field message and/or payload to ack!')
+ if field:
+ self.field = field
+ try:
+ _f('info', 'embedding payload') if verbose else None
+ payload.embedding = self.model.encode(
+ f"{instruction} {payload.text}", normalize_embeddings=True)
+ except Exception as e:
+ _f('fatal', e)
+ else:
+ await msg.in_progress()
+ try:
+ _f('info', 'indexing payload') if verbose else None
+ if not self.is_dupe(q=payload.embedding):
+ self.db.collection.insert([
+ [payload.document], [payload.text], [payload.embedding]
+ ])
+ self.db.collection.flush(collection_name_array=[
+ self.config['INDEX']])
+ if charge:
+ payload = EmbeddingPayload(
+ model=self.config['MODEL'],
+ embedding=self.model.encode(
+ f"{instruction} {payload.text}", normalize_embeddings=True).tolist(),
+ text=payload.text,
+ document=payload.document
+ )
+ if field:
+ _f('info', f'sending payload\n{payload}') if verbose else None
+ await self.field.pulse(payload)
+ await msg.ack()
+ else:
+ await msg.ack()
+ except Exception as e:
+ _f('fatal', e)
+
+
+
+[docs]
+ def search(self, payload, limit=100, cb=None, instruction="Represent this sentence for searching relevant passages: "):
+ """
+ Search for relevant passages based on a given input payload.
+
+ Args:
+ payload (str): The search query or input payload.
+ limit (int, optional): The maximum number of results to return. Defaults to 100.
+ cb (function, optional): A callback function to process the results. Defaults to None.
+
+ Returns:
+ list: A list of dictionaries, each containing the text, document, and distance of a relevant passage.
+ """
+ payload = EmbeddingPayload(
+ text=payload,
+ embedding=self.model.encode(
+ f"{instruction} {payload}", normalize_embeddings=True),
+ model=self.config['MODEL'],
+ document="none"
+ )
+
+ _results = self.db.collection.search(
+ data=[payload.embedding],
+ anns_field="embedding",
+ param=self.config['INDEX_PARAMS'],
+ limit=limit,
+ output_fields=['text', 'document']
+ )
+ results = []
+ for hits_i, hits in enumerate(_results):
+ for hit in hits:
+ results.append({
+ 'text': hit.entity.get('text'),
+ 'document': hit.entity.get('document'),
+ 'distance': hit.distance
+ })
+ if cb:
+ return cb(payload.text, results)
+ else:
+ return results
+
+
+
+
+
+
+
+
+
+[docs]
+ def delete(self, name=None):
+ """
+ Delete an index from the Milvus database.
+
+ Args:
+ name (str, optional): The name of the index to be deleted. If not provided, no index will be deleted.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If an error occurs during the deletion process.
+
+ Example Usage:
+ config = {
+ 'MODEL': 'bert-base-nli-mean-tokens',
+ 'INDEX': 'my_index',
+ 'INDEX_PARAMS': {'nprobe': 16}
+ }
+ embedder = Embedder(config)
+ embedder.delete('my_index')
+ """
+ if name and name == self.config['INDEX']:
+ try:
+ self.db.delete_index()
+ except Exception as e:
+ _f('fatal', e)
+ else:
+ _f('fatal', "name doesn't match the connection or the connection doesn't exist")
+
+
+
+[docs]
+ def is_dupe(self, q):
+ """
+ Check if a given query is a duplicate in the Milvus database.
+
+ Args:
+ q (object): The query embedding to check for duplicates.
+
+ Returns:
+ bool: True if the query embedding is a duplicate in the Milvus database, False otherwise.
+ """
+ match = self.db.collection.search(
+ data=[q]
+ , anns_field = "embedding"
+ , param=self.config['INDEX_PARAMS']
+ , output_fields=['text', 'document']
+ , limit=1
+ )
+ return True if sum(match[0].distances) == 0.0 and len(match[0])>0 else False
+
+
+
+from dataclasses import dataclass
+
+
+[docs]
+@dataclass
+class Payload:
+ """
+ Represents a payload with two main fields: text and document.
+
+ Args:
+ text (str): The text associated with the payload.
+ document (str): The document associated with the payload.
+ """
+ text: str
+ document: str
+
+
+
+[docs]
+@dataclass
+class GeneratedPayload:
+ """
+ Represents a payload generated by a system.
+
+ Args:
+ query (str): The query associated with the payload.
+ prompt (str): The prompt associated with the payload.
+ context (list): The context associated with the payload.
+ result (str): The result generated by the system.
+ model (str): The model used to generate the payload.
+ """
+ query: str
+ prompt: str
+ context: list
+ result: str
+ model: str
+
+
+
+[docs]
+@dataclass
+class EmbeddingPayload:
+ """
+ Represents a payload for embedding text data.
+
+ Attributes:
+ document (str): The document associated with the text data.
+ embedding (list): The embedding of the text data.
+ text (list): The text of the data.
+ model (str): The model used for embedding the text data.
+ """
+ document: str
+ embedding: list
+ text: list
+ model: str
+
+
+
+[docs]
+@dataclass
+class MistralArgs:
+ """
+ Represents a set of arguments for the Mistral model.
+
+ Args:
+ dim (int): The dimensionality of the model.
+ n_layers (int): The number of layers in the model.
+ head_dim (int): The dimensionality of each attention head.
+ hidden_dim (int): The dimensionality of the hidden layer in the feed-forward network.
+ n_heads (int): The number of attention heads.
+ n_kv_heads (int): The number of attention heads used for key-value attention.
+ norm_eps (float): The epsilon value used for numerical stability in layer normalization.
+ vocab_size (int): The size of the vocabulary used in the model.
+ """
+
+ dim: int
+ n_layers: int
+ head_dim: int
+ hidden_dim: int
+ n_heads: int
+ n_kv_heads: int
+ norm_eps: float
+ vocab_size: int
+
+
+
+[docs]
+@dataclass
+class JobParams:
+ milvus_host: str
+ milvus_port: int
+ milvus_username: str
+ milvus_password: str
+ milvus_collection: str
+ nats_host: str
+ nats_username: str
+ nats_password: str
+ nats_stream: str
+ nats_category: str
+ job_type: str
+ job_range: tuple
+ embedding_model: str
+ generation_model: str
+
+
+import re
+import os
+import torch
+import random
+import boto3
+from spacy.lang.en import English
+import inspect
+
+
+
+
+
+
+def _f(
+ tag: str = None,
+ body: any = None,
+ no_print: bool = False,
+ luxe: bool = False
+):
+ """
+ The `_f` function is a logging utility that prints messages with different tags and colors based on
+ the provided parameters.
+
+ :param tag: The `tag` parameter is a string that represents the tag for the log message. It can be
+ one of the following values: "FATAL", "WARN", "INFO", "WAIT", or "SUCCESS"
+ :type tag: str
+ :param body: The `body` parameter is used to specify the message or content that you want to
+ display. It can be of any type
+ :type body: any
+ :param no_print: The `no_print` parameter is a boolean flag that determines whether the output
+ should be printed or returned as a string.
+ the formatted string without printing it. If `no_print` is set to `False` (default)
+ :type no_print: bool (optional)
+ :param luxe: The `luxe` parameter is a boolean flag that determines whether to use a more luxurious
+ and colorful output format. If `luxe` is set to `True`, the output will include random colors,
+ emojis, and matrix-like characters.
+ :type luxe: bool (optional)
+ :return: The function `_f` returns a formatted string if the `no_print` parameter is set to `True`.
+ If `no_print` is `False`, the function prints the formatted string and returns `None`.
+ """
+ tags = [
+ ("FATAL", "☠️", "\033[91m"), # Red color for FATAL
+ ("WARN", "🚨", "\033[93m"), # Yellow color for WARN
+ ("INFO", "ℹ️", "\033[94m"), # Blue color for INFO
+ ("WAIT", "☕️", "\033[96m"), # Cyan color for WAIT
+ ("SUCCESS", "🌊", "\033[92m"), # Green color for SUCCESS
+ ]
+ _luxe = [
+ "\033[31m",
+ "\033[32m",
+ "\033[33m",
+ "\033[34m",
+ "\033[35m",
+ "\033[36m",
+ "\033[91m",
+ "\033[92m",
+ "\033[93m",
+ "\033[94m",
+ "\033[95m",
+ "\033[96m",
+ ]
+ _matrix = ["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"]
+ _joy = [
+ "🍤",
+ "🌈",
+ "📊",
+ "🏁",
+ "🌊",
+ "🧠",
+ "✨",
+ "🧮",
+ "🎉",
+ "🥳",
+ "🤩",
+ "🐈",
+ "❤️",
+ "💙",
+ "💜",
+ "💚",
+ "💛",
+ "🧡",
+ "⭐️",
+ ]
+ matching_tags = [x for x in tags if x[0] == tag.upper()]
+ if matching_tags:
+ tag_text = matching_tags[0][0]
+ emoji = matching_tags[0][1]
+ color_code = matching_tags[0][2]
+ if luxe:
+ return (
+ f"{_luxe[random.randint(0,len(_luxe)-1)]} {_joy[random.randint(0,len(_joy)-1)]} {_matrix[random.randint(0,len(_matrix)-1)]}: {body}\033[0m"
+ if no_print
+ else print(
+ f"{_luxe[random.randint(0,len(_luxe)-1)]} {_joy[random.randint(0,len(_joy)-1)]} {_matrix[random.randint(0,len(_matrix)-1)]}: {body}\033[0m"
+ )
+ )
+ else:
+ return (
+ f"{color_code} {emoji} {tag_text}: {body}\033[0m"
+ if no_print
+ else print(f"{color_code}{emoji} {tag_text}: {body}\033[0m")
+ )
+ else:
+ print(f"😭 UNKNOWN TAG - `{tag}`")
+
+
+[docs]
+class Utils:
+ """
+ The `Utils` class provides various utility functions for tasks such as checking CUDA availability, normalizing text, and uploading files to Amazon S3.
+
+ Example Usage:
+ # Create an instance of the Utils class
+ utils = Utils()
+
+ # Check if CUDA is available
+ utils.check_cuda()
+
+ # Normalize a text string
+ normalized_text = utils.normalize_text(" This is a sample text. ")
+
+ # Upload a file to Amazon S3
+ utils.upload_to_s3(file_or_dir="path/to/file.txt", keys=("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY"), bucket="my-bucket", bucket_path="data")
+
+ Main functionalities:
+ - Checking if CUDA is available on the machine
+ - Normalizing text by performing various cleaning operations
+ - Uploading files to Amazon S3
+
+ Methods:
+ - __init__(): Initializes the Utils class and sets up the spaCy English language model with a sentence tokenizer.
+ - check_cuda(): Checks if CUDA is available on the machine and provides additional information if it is.
+ - normalize_text(_): Cleans a text string by removing whitespace, replacing characters, and removing curly braces.
+ - upload_to_s3(file_or_dir, keys, bucket, bucket_path): Uploads a file or directory to an Amazon S3 bucket using the provided access keys and bucket information.
+
+ Fields:
+ The Utils class does not have any fields.
+ """
+
+ def __init__(self):
+ nlp = English()
+ nlp.add_pipe("sentencizer")
+ self.nlp = nlp
+
+
+[docs]
+ def check_cuda(self):
+ """
+ The function checks if CUDA is available on the machine and provides additional information if
+ it is.
+ """
+ if torch.cuda.is_available():
+ _f("success", "CUDA is available on this machine.")
+ # You can also print additional information like the number of available GPUs:
+ _f("info",
+ f"Number of available GPUs - {torch.cuda.device_count()}")
+ # To get the name of the GPU:
+ _f(
+ "info", f"GPU Name - {torch.cuda.get_device_name(0)}"
+ ) # 0 is the GPU index
+ return True
+ else:
+ _f("warn", "CUDA is not available on this machine.")
+
+
+
+[docs]
+ def normalize_text(self, _):
+ """
+ The `clean` function takes a string as input and performs various cleaning operations on it,
+ such as removing whitespace, replacing characters, and removing curly braces.
+
+ :param _: The parameter "_" is a placeholder for the input string that needs to be cleaned
+ :return: a cleaned version of the input string.
+ """
+ if isinstance(_, list):
+ _f(
+ "warn",
+ f"this item may not process properly because it is a list: \n{_}",
+ )
+ try:
+ if not isinstance(_, str):
+ _f('warn', f'non-string found {type(_)}')
+ _ = _.strip()
+ _ = _.replace('.', '') if (
+ _.count('.') / len(_)) * 100 > 0.3 else _
+
+ # Check if more than 20% of the string is integers
+ num_digits = sum(1 for char in _ if char.isdigit())
+ if (num_digits / len(_)) >= 0.2:
+ _ = "" # Replace with empty quotes
+ else:
+ _ = _.replace('"', "'")
+
+ # Check if the string has at least 5 words
+ words = _.split()
+ if len(words) < 5:
+ _ = "" # Replace with empty quotes if fewer than 5 words
+
+ _ = _.replace("\t", "")
+ _ = _.replace("\n", "")
+ _ = _.replace("\xa0", "")
+ _ = " ".join(_.split())
+ _ = re.sub(r" {[^}]*}", "", _)
+ return str(_)
+ except Exception as e:
+ _f("fatal", e)
+
+
+
+[docs]
+ def upload_to_s3(
+ self,
+ file_or_dir: str = None,
+ keys: tuple = ("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY"),
+ bucket: str = None,
+ bucket_path: str = None,
+ ):
+ """
+ Uploads a file or directory to an Amazon S3 bucket.
+
+ Args:
+ file_or_dir (str): The path of the file or directory to be uploaded.
+ keys (tuple): A tuple containing the access keys (access key ID and secret access key) required to access the Amazon S3 bucket. Default is ("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY").
+ bucket (str): The name of the Amazon S3 bucket where the file or directory will be uploaded.
+ bucket_path (str): The path within the bucket where the file or directory will be uploaded.
+
+ Returns:
+ None
+
+ Raises:
+ None
+
+ Example Usage:
+ # Create an instance of the Utils class
+ utils = Utils()
+
+ # Upload a file to Amazon S3
+ utils.upload_to_s3(file_or_dir="path/to/file.txt", keys=("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY"), bucket="my-bucket", bucket_path="data")
+ """
+ aws_access_key_id, aws_secret_access_key = keys
+
+ s3 = boto3.client(
+ "s3",
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ )
+ _f("warn", f"uploading to S3 - {file_or_dir}")
+ if os.path.isfile(os.path.abspath(file_or_dir)):
+ s3.upload_file(
+ os.path.abspath(
+ file_or_dir), bucket, f'{bucket_path}/{file_or_dir.split("/")[-1]}'
+ )
+ _f("success",
+ f'uploaded - {bucket}/{bucket_path}/{file_or_dir.split("/")[-1]}')
+ elif os.path.isdir(os.path.abspath(file_or_dir)):
+ for filename in os.listdir(file_or_dir):
+ f = os.path.join(file_or_dir, filename)
+ s3.upload_file(
+ os.path.abspath(
+ f), bucket, f'{bucket_path}/{f.split("/")[-1]}'
+ )
+ _f("success",
+ f'uploaded - {bucket}/{bucket_path}/{f.split("/")[-1]}')
+
+
+
+import requests
+import json
+
+
+[docs]
+class InferenceAPI:
+ """
+ A class that provides a convenient way to make HTTP POST requests to an inference API endpoint.
+
+ Attributes:
+ token (str): A string representing the token used for authorization.
+ """
+
+ def __init__(self, token):
+ """
+ Initializes the InferenceAPI class with a token.
+
+ Args:
+ token (str): A string representing the token used for authorization.
+ """
+ self.token = token
+
+
+[docs]
+ def invoke(self, payload):
+ """
+ Makes an HTTP POST request to an inference API endpoint and returns the response.
+
+ Args:
+ payload (str): A JSON string representing the payload to be sent to the inference API. It should contain the model name and input data.
+
+ Returns:
+ str: A JSON string representing the response from the inference API.
+ """
+ payload = json.loads(payload)
+ headers = {"Authorization": f"Bearer {self.token}"}
+ response = requests.post(f"https://api-inference.huggingface.co/models/{payload['model']}", headers=headers, json=payload)
+ return response.json()
+
+
+
+from rich.markdown import Markdown
+from rich.console import Console
+
+
+
+
+from pymilvus import connections, utility, FieldSchema, CollectionSchema, DataType, Collection
+from magnet.utils.globals import _f
+import random, array
+
+
+[docs]
+class MilvusDB:
+ """
+ A class that provides a high-level interface for interacting with a Milvus database.
+
+ Args:
+ config (dict): A configuration dictionary that contains the necessary parameters for connecting to the Milvus server and creating a collection.
+
+ Attributes:
+ config (dict): A configuration dictionary that contains the necessary parameters for connecting to the Milvus server and creating a collection.
+ fields (list): A list of `FieldSchema` objects that define the fields of the collection.
+ schema (CollectionSchema): A `CollectionSchema` object that represents the schema of the collection.
+ index_params (dict): A dictionary that contains the parameters for creating an index on the collection.
+ connection: The connection object to the Milvus server.
+ collection: The collection object in MilvusDB.
+
+ """
+
+ def __init__(self, config):
+ self.config = config
+ self.fields = [
+ FieldSchema(name='id', dtype=DataType.INT64, is_primary=True, auto_id=True),
+ FieldSchema(name='document', dtype=DataType.VARCHAR, max_length=4096),
+ FieldSchema(name='text', dtype=DataType.VARCHAR, max_length=65535),
+ FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, dim=self.config['DIMENSION'])
+ ]
+ self.schema = CollectionSchema(fields=self.fields)
+ self.index_params = self.config['INDEX_PARAMS']
+
+
+[docs]
+ def on(self):
+ """
+ Establishes a connection to the Milvus server and creates a collection object.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If an error occurs during the connection to the Milvus server.
+
+ """
+ try:
+ self.connection = connections.connect(
+ host=self.config['MILVUS_URI']
+ , port=self.config['MILVUS_PORT']
+ , user=self.config['MILVUS_USER']
+ , password=self.config['MILVUS_PASSWORD']
+ , alias='magnet'
+ ) \
+ if 'MILVUS_PASSWORD' in self.config and 'MILVUS_USER' in self.config \
+ else connections.connect(
+ host=self.config['MILVUS_URI']
+ , port=self.config['MILVUS_PORT']
+ , alias='magnet'
+ )
+ _f('success', f"connected successfully to {self.config['MILVUS_URI']}")
+ except Exception as e:
+ _f('fatal', e)
+
+
+
+[docs]
+ def off(self):
+ """
+ Disconnects from the Milvus server.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If an error occurs during the disconnection from the Milvus server.
+
+ """
+ try:
+ self.connection = connections.disconnect(alias="magnet")
+ _f('warn', f"disconnected from {self.config['MILVUS_URI']}")
+ except Exception as e:
+ _f('fatal', e)
+
+
+
+[docs]
+ def create(self, overwrite=False):
+ """
+ Create a collection in MilvusDB and create an index on a specific field of the collection.
+
+ Args:
+ overwrite (bool, optional): A boolean flag indicating whether to overwrite the existing collection with the same name. Default is False.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If an error occurs during the creation of the collection or index.
+
+ Example Usage:
+ config = {
+ 'MILVUS_URI': 'localhost',
+ 'INDEX': 'my_collection',
+ 'DIMENSION': 128,
+ 'INDEX_PARAMS': {'index_type': 'IVF_FLAT', 'nlist': 100}
+ }
+ milvus_db = MilvusDB(config)
+ milvus_db.create(overwrite=True)
+
+ """
+ if utility.has_collection(self.config['INDEX'], using="magnet") and overwrite:
+ utility.drop_collection(self.config['INDEX'], using="magnet")
+ try:
+ self.collection = Collection(name=self.config['INDEX'], schema=self.schema, using="magnet")
+ self.collection.create_index(field_name="embedding", index_params=self.index_params)
+ _f('success', f"{self.config['INDEX']} created")
+ except Exception as e:
+ _f('fatal', e)
+
+
+
+[docs]
+ def load(self):
+ self.collection = Collection(name=self.config['INDEX'], schema=self.schema, using='magnet')
+ self.collection.load()
+
+
+
+[docs]
+ def initialize(self, user: str = 'magnet', password: str = '33011033'):
+ try:
+ _f('warn', f"initializing {self.config['MILVUS_URI']} using `root` to create '{user}'")
+ utility.reset_password('root', 'Milvus', self._pw(), using='magnet')
+ _f('success', f"secured root user successfully on {self.config['MILVUS_URI']}")
+ utility.create_user(user, password, using='magnet')
+ _f('success', f"created requested {user} on {self.config['MILVUS_URI']}")
+ try:
+ self.off()
+ self.connection = connections.connect(
+ host=self.config['MILVUS_URI']
+ , port=self.config['MILVUS_PORT']
+ , user=user
+ , password=password
+ )
+ _f('success', 'Milvus has been initialized with your new credentials')
+ except Exception as e:
+ _f('fatal', e)
+ except Exception as e:
+ _f('fatal', e)
+
+
+
+[docs]
+ def delete_index(self):
+ """
+ Deletes the index of a collection in MilvusDB.
+
+ Returns:
+ None
+
+ """
+ if utility.has_collection(self.config['INDEX']):
+ utility.drop_collection(self.config['INDEX'])
+ _f('warn', f"{self.config['INDEX']} deleted")
+
+
+ def _pw(self):
+ MAX_LEN = 24
+ DIGITS = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
+ LOCASE_CHARACTERS = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h',
+ 'i', 'j', 'k', 'm', 'n', 'o', 'p', 'q',
+ 'r', 's', 't', 'u', 'v', 'w', 'x', 'y',
+ 'z']
+ UPCASE_CHARACTERS = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H',
+ 'I', 'J', 'K', 'M', 'N', 'O', 'P', 'Q',
+ 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y',
+ 'Z']
+ SYMBOLS = ['@', '#', '$', '(', ')']
+ COMBINED_LIST = DIGITS + UPCASE_CHARACTERS + LOCASE_CHARACTERS + SYMBOLS
+ rand_digit = random.choice(DIGITS)
+ rand_upper = random.choice(UPCASE_CHARACTERS)
+ rand_lower = random.choice(LOCASE_CHARACTERS)
+ rand_symbol = random.choice(SYMBOLS)
+ temp_pass = rand_digit + rand_upper + rand_lower + rand_symbol
+ for x in range(MAX_LEN - 4):
+ temp_pass = temp_pass + random.choice(COMBINED_LIST)
+ temp_pass_list = array.array('u', temp_pass)
+ random.shuffle(temp_pass_list)
+ password = ""
+ for x in temp_pass_list:
+ password = password + x
+ _f('warn', f'Your Milvus `root` user password is now {password}')
+ return password
+
+
+
+[docs]
+class Prompts:
+ """
+ The `Prompts` class is used to generate a formatted prompt for generating an answer based on a given question and a list of documents.
+ """
+
+ def __init__(self):
+ """
+ Initializes an instance of the `Prompts` class.
+ """
+ return
+
+
+[docs]
+ def qa_ref(self, docs: list = [], q: str = None) -> str:
+ """
+ Generates a formatted prompt for generating an answer based on the given documents and question.
+
+ Args:
+ docs (list): A list of dictionaries representing the documents. Each dictionary should have a "title" key and a "content" key.
+ q (str): The question for which an answer is to be generated.
+
+ Returns:
+ str: A formatted string representing a prompt for generating an answer based on the given documents and question.
+ """
+ docs = '\n'.join([f"Document[name={d}]\ {t}\n\n" for (t, d, c) in [tuple(x.values()) for x in docs]])
+ return """Create a concise and informative answer (no more than 200 words) for a given question based solely on the given documents. You must only use information from the given documents. Use an unbiased and journalistic tone. Do not repeat text. Cite the documents using Document[name] notation. If multiple documents contain the answer, cite those documents like ‘as stated in Document[name], Document[name], etc.’. If the documents do not contain the answer to the question, say that ‘answering is not possible given the available information.’
+ [DOCUMENTS]
+ Question: [QUERY]; Answer: """ \
+ .replace('[DOCUMENTS]', docs) \
+ .replace('[QUERY]', q)
+
+
+
' + + '' + + _("Hide Search Matches") + + "
" + ) + ); + }, + + /** + * helper function to hide the search marks again + */ + hideSearchWords: () => { + document + .querySelectorAll("#searchbox .highlight-link") + .forEach((el) => el.remove()); + document + .querySelectorAll("span.highlighted") + .forEach((el) => el.classList.remove("highlighted")); + localStorage.removeItem("sphinx_highlight_terms") + }, + + initEscapeListener: () => { + // only install a listener if it is really needed + if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) return; + + document.addEventListener("keydown", (event) => { + // bail for input elements + if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; + // bail with special keys + if (event.shiftKey || event.altKey || event.ctrlKey || event.metaKey) return; + if (DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS && (event.key === "Escape")) { + SphinxHighlight.hideSearchWords(); + event.preventDefault(); + } + }); + }, +}; + +_ready(() => { + /* Do not call highlightSearchWords() when we are on the search page. + * It will highlight words from the *previous* search query. + */ + if (typeof Search === "undefined") SphinxHighlight.highlightSearchWords(); + SphinxHighlight.initEscapeListener(); +}); diff --git a/genindex.html b/genindex.html new file mode 100644 index 0000000..6cefc34 --- /dev/null +++ b/genindex.html @@ -0,0 +1,695 @@ + + + + + ++ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ |
+ | + |
+ |
+ |
Processor
Embedder
Utils
reversal()
InferenceAPI
m_print()
MilvusDB
Prompts
don’t forget to check out the helpful 💻 examples
+the small distributed language model toolkit
+ +⚡️ fine-tune state-of-the-art LLMs anywhere, rapidly ⚡️
+pip install llm-magnet
+
or
+python3 setup.py install
+
⚡️ It’s Fast
+fast on consumer hardware
very fast on Apple Silicon
extremely fast on ROCm/CUDA
🫵 Automatic or your way
+rely on established transformer patterns to let magnet
do the
+work
keep your existing data processing functions, bring them to
+magnet
!
🛰️ 100% Distributed
+processing, embedding, storage, retrieval, querying, or inference +from anywhere
as much or as little compute as you need
🧮 Choose Inference Method
+HuggingFace
vLLM node
GPU
mlx
🌎 Huge Volumes
+handle gigantic amounts of data inexpensively
fault-tolerant by design
decentralized workloads
🔐 Secure
+JWT
Basic
🪵 World-Class Comprehension
+magnet
optionally logs its own code as it’s executed (yes,
+really)
build a self-aware system and allow it to learn from itself
emojis are the future
learn more about how Prismadic uses 🧲
+subscribe to our substack
++
EmbeddingPayload
+GeneratedPayload
+JobParams
JobParams.embedding_model
JobParams.generation_model
JobParams.job_range
JobParams.job_type
JobParams.milvus_collection
JobParams.milvus_host
JobParams.milvus_password
JobParams.milvus_port
JobParams.milvus_username
JobParams.nats_category
JobParams.nats_host
JobParams.nats_password
JobParams.nats_stream
JobParams.nats_username
MistralArgs
+Payload
+InferenceAPI
+m_print()
Prompts
+Bases: object
The Charge class is responsible for connecting to a NATS server, managing streams and categories, and publishing data to the server.
+server (str): The NATS server URL.
+server (str): The NATS server URL. +category (str): The current category. +stream (str): The current stream. +nc: The NATS connection object. +js: The JetStream API object.
+Deletes the specified stream if the name matches the current stream, or prints an error message if the name doesn’t match or the stream doesn’t exist.
+name (str, optional): The name of the stream to delete. Defaults to None.
+Publishes data to the NATS server using the specified category and payload.
+job (dict, optional): The data to be published. Defaults to {}.
+Connects to the NATS server, creates a stream and category if they don’t exist, and prints a success message.
+category (str, optional): The category to connect to. Defaults to ‘no_category’. +stream (str, optional): The stream to connect to. Defaults to ‘documents’. +create (bool, optional): Whether to create the stream and category if they don’t exist. Defaults to False.
+Bases: object
The Resonator class is responsible for connecting to a NATS server, subscribing to a specific category in a stream, and consuming messages from that category.
+server (str): The address of the NATS server.
+server (str): The address of the NATS server. +category (str): The category to subscribe to. +stream (str): The stream to subscribe to. +session (str): The session name for durable subscriptions. +nc (nats.aio.client.Client): The NATS client. +js (nats.aio.client.JetStream): The JetStream client. +sub (nats.aio.client.Subscription): The subscription.
+__init__(self, server: str) +on(self, category: str = ‘no_category’, stream: str = ‘documents’, cb=print, session=’magnet’) -> None +info(self, session: str = None) -> None +off(self) -> None
+Retrieves information about a consumer in a JetStream stream.
+session – A string representing the session name of the consumer. If not provided, information about all consumers in the stream will be retrieved.
+None
+Consume messages from a specific category in a stream.
+cb (function, optional): The callback function to process the received messages. Defaults to print.
+None
+Exception: If there is an error in consuming the message or processing the callback function.
+Unsubscribes from the category and stream and disconnects from the NATS server.
+None
+Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
+category (str, optional): The category to subscribe to. Defaults to ‘no_category’. +stream (str, optional): The stream to subscribe to. Defaults to ‘documents’. +session (str, optional): The session name for durable subscriptions. Defaults to ‘magnet’.
+None
+TimeoutError: If there is a timeout error while connecting to the NATS server. +Exception: If there is an error in consuming the message or processing the callback function.
+Consume messages from a specific category in a stream and process them as jobs.
+cb (function, optional): The callback function to process the received messages. Defaults to print.
+None
+Exception: If there is an error in consuming the message or processing the callback function.
+Bases: object
The Processor class is responsible for loading, processing, and saving data. It provides methods for loading data from different file formats, splitting the data into smaller chunks, and saving the processed data to a file.
+# Initialize the Processor class object +processor = Processor()
+# Load data into the processor +processor.load(raw_data, text_column=”clean”, id_column=”id”)
+# Process and split the loaded data into smaller chunks +await processor.process(path=”processed_data.csv”, splitter=None, nlp=True)
+# Save the processed data to a file +processor.save(filename=”processed_data.csv”, raw=processor.df)
+Loading data from different file formats (csv, json, xlsx, parquet) or a pandas DataFrame
Splitting the loaded data into smaller chunks either by sentences using natural language processing (NLP) or by a fixed character length
Saving the processed data to a file in different formats (csv, json, xlsx, parquet)
__init__(): Initializes the Processor class object and sets the df attribute to None and creates an instance of the Utils class.
save(filename: str = None, raw: pd.DataFrame = None): Saves the pandas DataFrame to a file with the specified filename and file format.
load(raw: str | pd.DataFrame = None, text_column: str = “clean”, id_column: str = ‘id’): Loads data into the df attribute of the Processor class.
process(path: str = None, splitter: any = None, nlp=True): Processes and splits the loaded data into smaller chunks.
default_splitter(data, window_size=768, overlap=76, nlp=True): Splits the given input data into smaller chunks either by sentences or by a fixed character length.
df: A pandas DataFrame that stores the loaded data.
utils: An instance of the Utils class that provides utility functions.
Process and send data to a field.
+field (optional): The field object to which the data will be sent.
+None
+ValueError: If no data is loaded. +ValueError: If no field is loaded.
+# Initialize the Processor class object +processor = Processor()
+# Load data into the processor +processor.load(raw_data, text_column=”clean”, id_column=”id”)
+# Initialize the field object +field = Field()
+# Create charges and send data to the field +await processor.create_charge(field)
+Splits the given input data into smaller chunks either by sentences or by a fixed character length.
+data (str): The input data to be split. +window_size (int, optional): The size of each chunk when splitting by a fixed character length. Defaults to 768. +overlap (int, optional): The number of characters to overlap between each chunk when splitting by a fixed character length. Defaults to 76. +nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length. Defaults to True.
+list: If nlp is True, returns a list of sentences extracted from the input data. If nlp is False, returns a list of chunks obtained by splitting the input data into fixed-sized chunks.
+Load data into the df attribute of the Processor class.
+raw (str | pd.DataFrame): The input data to be loaded. It can be either a file path (str) or a pandas DataFrame. +text_column (str): The name of the column in the input data that contains the text data. Default is “clean”. +id_column (str): The name of the column in the input data that contains the unique identifier for each data entry. Default is “id”.
+ValueError: If the file extension is not supported or an exception occurs during the loading process.
+None: If an error occurs during the loading process. +str: Success message if the data is successfully loaded.
+Process and split the loaded data into smaller chunks.
+path (str): The path to save the processed data. +splitter (function, optional): A custom function to split the text into chunks. If not provided, the default splitter function in the Processor class will be used. +nlp (bool, optional): A flag indicating whether to split the data into sentences using natural language processing (NLP) or by a fixed character length.
+None: If there is no data loaded or if an error occurs during the processing. +The processed data saved to the specified file path.
+Save the pandas DataFrame to a file with the specified filename and file format.
+filename (str): The name of the file to save the data to. +raw (pd.DataFrame): The pandas DataFrame containing the data to be saved.
+ValueError: If the data format is unsupported or an error occurs during the saving process.
+None: If an error occurs during the saving process. +str: Success message if the data is successfully saved to the specified file.
+Bases: object
The Embedder class is responsible for embedding text using a pre-trained sentence transformer model and storing or sending the embeddings for further processing. It utilizes the Milvus database for storing and searching the embeddings.
+config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters. +create (bool, optional): If set to True, a connection to the Milvus database will be created. Defaults to False.
+config (dict): A dictionary containing the configuration parameters for the Embedder class, including the model name, index name, and index parameters. +model (SentenceTransformer): An instance of the SentenceTransformer class from the sentence_transformers library, used for text embedding. +db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database.
+embed_and_store(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and stores it in the Milvus database. +embed_and_charge(self, payload, verbose=False, field=None): Embeds the given payload using a pre-trained sentence transformer model and sends it to a specified field for further processing.
+Delete an index from the Milvus database.
+name (str, optional): The name of the index to be deleted. If not provided, no index will be deleted.
+None
+Exception: If an error occurs during the deletion process.
+‘MODEL’: ‘bert-base-nli-mean-tokens’, +‘INDEX’: ‘my_index’, +‘INDEX_PARAMS’: {‘nprobe’: 16}
+} +embedder = Embedder(config) +embedder.delete(‘my_index’)
+Embeds the given payload using a pre-trained sentence transformer model and stores it in a Milvus database.
+payload (object): An object containing the text and document attributes to be embedded and stored. +verbose (bool, optional): A boolean indicating whether additional information should be logged during the embedding process. Defaults to False. +field (object, optional): An object representing the field to which the encoded payload will be sent for further processing.
+None
+Exception: If an error occurs during the embedding or storing process.
+Check if a given query is a duplicate in the Milvus database.
+q (object): The query embedding to check for duplicates.
+bool: True if the query embedding is a duplicate in the Milvus database, False otherwise.
+Search for relevant passages based on a given input payload.
+payload (str): The search query or input payload. +limit (int, optional): The maximum number of results to return. Defaults to 100. +cb (function, optional): A callback function to process the results. Defaults to None.
+list: A list of dictionaries, each containing the text, document, and distance of a relevant passage.
+Bases: object
Represents a payload for embedding text data.
+document (str): The document associated with the text data. +embedding (list): The embedding of the text data. +text (list): The text of the data. +model (str): The model used for embedding the text data.
+Bases: object
Represents a payload generated by a system.
+query (str): The query associated with the payload. +prompt (str): The prompt associated with the payload. +context (list): The context associated with the payload. +result (str): The result generated by the system. +model (str): The model used to generate the payload.
+Bases: object
Bases: object
Represents a set of arguments for the Mistral model.
+dim (int): The dimensionality of the model. +n_layers (int): The number of layers in the model. +head_dim (int): The dimensionality of each attention head. +hidden_dim (int): The dimensionality of the hidden layer in the feed-forward network. +n_heads (int): The number of attention heads. +n_kv_heads (int): The number of attention heads used for key-value attention. +norm_eps (float): The epsilon value used for numerical stability in layer normalization. +vocab_size (int): The size of the vocabulary used in the model.
+Bases: object
Represents a payload with two main fields: text and document.
+text (str): The text associated with the payload. +document (str): The document associated with the payload.
+Bases: object
The Utils class provides various utility functions for tasks such as checking CUDA availability, normalizing text, and uploading files to Amazon S3.
+# Create an instance of the Utils class +utils = Utils()
+# Check if CUDA is available +utils.check_cuda()
+# Normalize a text string +normalized_text = utils.normalize_text(” This is a sample text. “)
+# Upload a file to Amazon S3 +utils.upload_to_s3(file_or_dir=”path/to/file.txt”, keys=(“YOUR_ACCESS_KEY”, “YOUR_SECRET_KEY”), bucket=”my-bucket”, bucket_path=”data”)
+Main functionalities: +- Checking if CUDA is available on the machine +- Normalizing text by performing various cleaning operations +- Uploading files to Amazon S3
+Methods: +- __init__(): Initializes the Utils class and sets up the spaCy English language model with a sentence tokenizer. +- check_cuda(): Checks if CUDA is available on the machine and provides additional information if it is. +- normalize_text(_): Cleans a text string by removing whitespace, replacing characters, and removing curly braces. +- upload_to_s3(file_or_dir, keys, bucket, bucket_path): Uploads a file or directory to an Amazon S3 bucket using the provided access keys and bucket information.
+Fields: +The Utils class does not have any fields.
+The function checks if CUDA is available on the machine and provides additional information if +it is.
+The clean function takes a string as input and performs various cleaning operations on it, +such as removing whitespace, replacing characters, and removing curly braces.
+_ – The parameter “_” is a placeholder for the input string that needs to be cleaned
+a cleaned version of the input string.
+Uploads a file or directory to an Amazon S3 bucket.
+file_or_dir (str): The path of the file or directory to be uploaded. +keys (tuple): A tuple containing the access keys (access key ID and secret access key) required to access the Amazon S3 bucket. Default is (“YOUR_ACCESS_KEY”, “YOUR_SECRET_KEY”). +bucket (str): The name of the Amazon S3 bucket where the file or directory will be uploaded. +bucket_path (str): The path within the bucket where the file or directory will be uploaded.
+None
+None
+# Create an instance of the Utils class +utils = Utils()
+# Upload a file to Amazon S3 +utils.upload_to_s3(file_or_dir=”path/to/file.txt”, keys=(“YOUR_ACCESS_KEY”, “YOUR_SECRET_KEY”), bucket=”my-bucket”, bucket_path=”data”)
+Bases: object
A class that provides a convenient way to make HTTP POST requests to an inference API endpoint.
+token (str): A string representing the token used for authorization.
+Makes an HTTP POST request to an inference API endpoint and returns the response.
+payload (str): A JSON string representing the payload to be sent to the inference API. It should contain the model name and input data.
+str: A JSON string representing the response from the inference API.
+Bases: object
A class that provides a high-level interface for interacting with a Milvus database.
+config (dict): A configuration dictionary that contains the necessary parameters for connecting to the Milvus server and creating a collection.
+config (dict): A configuration dictionary that contains the necessary parameters for connecting to the Milvus server and creating a collection. +fields (list): A list of FieldSchema objects that define the fields of the collection. +schema (CollectionSchema): A CollectionSchema object that represents the schema of the collection. +index_params (dict): A dictionary that contains the parameters for creating an index on the collection. +connection: The connection object to the Milvus server. +collection: The collection object in MilvusDB.
+Create a collection in MilvusDB and create an index on a specific field of the collection.
+overwrite (bool, optional): A boolean flag indicating whether to overwrite the existing collection with the same name. Default is False.
+None
+Exception: If an error occurs during the creation of the collection or index.
+‘MILVUS_URI’: ‘localhost’, +‘INDEX’: ‘my_collection’, +‘DIMENSION’: 128, +‘INDEX_PARAMS’: {‘index_type’: ‘IVF_FLAT’, ‘nlist’: 100}
+} +milvus_db = MilvusDB(config) +milvus_db.create(overwrite=True)
+Bases: object
The Prompts class is used to generate a formatted prompt for generating an answer based on a given question and a list of documents.
+Generates a formatted prompt for generating an answer based on the given documents and question.
+docs (list): A list of dictionaries representing the documents. Each dictionary should have a “title” key and a “content” key. +q (str): The question for which an answer is to be generated.
+str: A formatted string representing a prompt for generating an answer based on the given documents and question.
+EmbeddingPayload
+GeneratedPayload
+JobParams
JobParams.embedding_model
JobParams.generation_model
JobParams.job_range
JobParams.job_type
JobParams.milvus_collection
JobParams.milvus_host
JobParams.milvus_password
JobParams.milvus_port
JobParams.milvus_username
JobParams.nats_category
JobParams.nats_host
JobParams.nats_password
JobParams.nats_stream
JobParams.nats_username
MistralArgs
+Payload
+InferenceAPI
+m_print()
Prompts
++ m | ||
+ |
+ magnet | + |
+ |
+ magnet.ic.field | + |
+ |
+ magnet.ize.filings | + |
+ |
+ magnet.ize.memory | + |
+ |
+ magnet.utils.data_classes | + |
+ |
+ magnet.utils.globals | + |
+ |
+ magnet.utils.huggingface | + |
+ |
+ magnet.utils.markdown | + |
+ |
+ magnet.utils.milvus | + |
+ |
+ magnet.utils.prompts | + |