diff --git a/.buildinfo b/.buildinfo new file mode 100644 index 0000000..5672f25 --- /dev/null +++ b/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: ced2ed8930753438f7bfc8caa2ddd2f8 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md new file mode 100644 index 0000000..55b369d --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +#GitHub Pages + +Last update of sphinx html documentation from [98accc6](https://github.com/Prismadic/magnet/tree/98accc69363772c2bf1ab6b5f0641945f30b2778) diff --git a/_modules/index.html b/_modules/index.html new file mode 100644 index 0000000..8329a64 --- /dev/null +++ b/_modules/index.html @@ -0,0 +1,267 @@ + + +
+ + +
+import json, datetime, xxhash, platform
+
+from dataclasses import asdict
+from tabulate import tabulate
+
+from magnet.base import Magnet
+from magnet.utils.globals import _f
+from magnet.utils.data_classes import *
+
+from nats.errors import TimeoutError
+from nats.js.api import StreamConfig, ConsumerConfig
+from nats.js.errors import ServerError
+
+x = xxhash
+dt = datetime.datetime.now(datetime.timezone.utc)
+utc_time = dt.replace(tzinfo=datetime.timezone.utc)
+utc_timestamp = utc_time.timestamp()
+
+
+[docs]
+class Charge:
+ def __init__(self, magnet: Magnet):
+ self.magnet = magnet
+
+
+[docs]
+ async def list_streams(self):
+ try:
+ streams = await self.magnet.js.streams_info()
+ remote_streams = [x.config.name for x in streams]
+ remote_subjects = [x.config.subjects for x in streams]
+ data = zip(remote_streams, remote_subjects)
+ # Initialize an empty list to store formatted data
+ formatted_data = []
+
+ # Loop through each stream and its subjects
+ for stream, subjects in zip(remote_streams, remote_subjects):
+ # Check if the current stream name or any of the subjects match the config variables
+ match_stream_name = stream == self.magnet.config.stream_name
+ match_subject = self.magnet.config.category in subjects
+
+ # If there's a match, format the stream name and subjects with ANSI green color and a magnet emoji
+ if match_stream_name or match_subject:
+ formatted_stream = f"\033[92m{stream} \U0001F9F2\033[0m" # Green and magnet emoji for stream name
+ formatted_subjects = [f"\033[92m{subject} \U0001F9F2\033[0m" if subject == self.magnet.config.category else subject for subject in subjects]
+ else:
+ formatted_stream = stream
+ formatted_subjects = subjects
+
+ # Add the formatted stream and subjects to the list
+ formatted_data.append([formatted_stream, ', '.join(formatted_subjects)])
+
+ # Creating a table with the formatted data
+ table = tabulate(formatted_data, headers=['Stream Name', 'Subjects'], tablefmt="pretty")
+
+ _f("info", f'\n{table}')
+ except TimeoutError:
+ return _f('fatal', f'could not connect to {self.magnet.config.host}')
+ except Exception as e:
+ return _f('fatal', e)
+
+
+
+[docs]
+ async def on(self):
+ try:
+ streams = await self.magnet.js.streams_info()
+ remote_streams = [x.config.name for x in streams]
+ remote_subjects = [x.config.subjects for x in streams]
+ if self.magnet.config.stream_name not in remote_streams:
+ return _f('fatal', f'{self.magnet.config.stream_name} not found, initialize with `Magnet.align()` first')
+ elif self.magnet.config.category not in sum(remote_subjects, []):
+ if self.magnet.config.category not in sum([x.config.subjects for x in streams if x.config.name == self.magnet.config.stream_name], []):
+ try:
+ subjects = sum(
+ [x.config.subjects for x in streams if x.config.name == self.magnet.config.stream_name], [])
+ subjects.append(self.magnet.config.category)
+ await self.magnet.js.update_stream(StreamConfig(
+ name=self.magnet.config.stream_name
+ , subjects=subjects
+ ))
+ _f("success", f'created [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}')
+ except ServerError as e:
+ _f('fatal', f"couldn't create {self.magnet.config.stream_name} on {self.magnet.config.host}, ensure your `category` is set")
+ except TimeoutError:
+ return _f('fatal', f'could not connect to {self.magnet.config.host}')
+ _f("success", f'ready [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}')
+
+
+
+[docs]
+ async def off(self):
+ """
+ Disconnects from the NATS server and prints a warning message.
+ """
+ await self.magnet.nc.drain()
+ await self.magnet.nc.close()
+ _f('warn', f'disconnected from {self.magnet.config.host}')
+
+
+
+[docs]
+ async def pulse(self, payload: Payload | GeneratedPayload | EmbeddingPayload | JobParams = None, v=False):
+ """
+ Publishes data to the NATS server using the specified category and payload.
+
+ Args:
+ payload (dict): The data to be published.
+ """
+ try:
+ bytes_ = json.dumps(asdict(payload), separators=(
+ ', ', ':')).encode('utf-8')
+ except Exception as e:
+ return _f('fatal', f'invalid object, more info:\n{e} in [Payload, GeneratedPayload, EmbeddingPayload, JobParams]')
+ try:
+ _hash = x.xxh64(bytes_).hexdigest()
+ msg = await self.magnet.js.publish(
+ self.magnet.config.category, bytes_, headers={
+ "Nats-Msg-Id": _hash
+ }
+ )
+ _f('success', f'pulsed to {self.magnet.config.category} on {self.magnet.config.stream_name}') if v else None
+ _ts = datetime.datetime.now(datetime.timezone.utc)
+ msg.ts = _ts
+ return msg
+ except Exception as e:
+ return _f('fatal', f'could not pulse data to {self.magnet.config.host}\n{e}')
+
+
+
+[docs]
+ async def excite(self, job: dict = {}):
+ """
+ Publishes data to the NATS server using the specified category and payload.
+
+ Args:
+ job (dict, optional): The data to be published. Defaults to {}.
+ """
+ try:
+ bytes_ = json.dumps(job, separators=(', ', ':')).encode('utf-8')
+ except Exception as e:
+ _f('fatal', f'invalid JSON\n{e}')
+ try:
+ _hash = x.xxh64(bytes_).hexdigest()
+ await self.magnet.js.publish(
+ self.magnet.config.category, bytes_, headers={
+ "Nats-Msg-Id": _hash
+ }
+ )
+ except Exception as e:
+ _f('fatal', f'could not send data to {self.magnet.config.host}\n{e}')
+
+
+
+[docs]
+ async def emp(self, name=None):
+ """
+ Deletes the specified stream if the name matches the current stream, or prints an error message if the name doesn't match or the stream doesn't exist.
+
+ Args:
+ name (str, optional): The name of the stream to delete. Defaults to None.
+ """
+ if name and name == self.magnet.config.stream_name:
+ await self.magnet.js.delete_stream(name=self.magnet.config.stream_name)
+ _f('warn', f'{self.magnet.config.stream_name} stream deleted')
+ else:
+ _f('fatal', "name doesn't match the stream or stream doesn't exist")
+
+
+
+[docs]
+ async def reset(self, name=None):
+ """
+ Purges the specified category if the name matches the current category, or prints an error message if the name doesn't match or the category doesn't exist.
+
+ Args:
+ name (str, optional): The name of the category to purge. Defaults to None.
+ """
+ if name and name == self.magnet.config.category:
+ await self.js.purge_stream(name=self.magnet.config.stream_name, subject=self.magnet.config.category)
+ _f('warn', f'{self.magnet.config.category} category deleted')
+ else:
+ _f('fatal', "name doesn't match the stream category or category doesn't exist")
+
+
+
+
+[docs]
+class Resonator:
+ def __init__(self, magnet: Magnet):
+ """
+ Initializes the `Resonator` class with the NATS server address.
+
+ Args:
+ server (str): The address of the NATS server.
+ """
+ self.magnet = magnet
+
+
+[docs]
+ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000):
+ """
+ Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
+
+ Args:
+ category (str, optional): The category to subscribe to. Defaults to 'no_category'.
+ stream (str, optional): The stream to subscribe to. Defaults to 'documents'.
+ session (str, optional): The session name for durable subscriptions. Defaults to 'magnet'.
+
+ Returns:
+ None
+
+ Raises:
+ TimeoutError: If there is a timeout error while connecting to the NATS server.
+ Exception: If there is an error in consuming the message or processing the callback function.
+ """
+ self.node = f'{platform.node()}_{x.xxh64(platform.node(), seed=int(utc_timestamp)).hexdigest()}' if local else platform.node()
+ self.durable = f'{self.node}_job' if job else self.node
+ self.consumer_config = ConsumerConfig(
+ ack_policy="explicit"
+ , max_ack_pending=bandwidth
+ , ack_wait=3600
+ )
+ _f('wait', f'connecting to {self.magnet.config.host}')
+ try:
+ self.sub = await self.magnet.js.pull_subscribe(
+ durable=self.magnet.config.session
+ , subject=self.magnet.config.category
+ , stream=self.magnet.config.stream_name
+ , config=self.consumer_config
+ )
+ _f('info',
+ f'joined worker queue: {self.magnet.config.session} as {self.node}')
+ except Exception as e:
+ return _f('fatal', e)
+
+
+
+[docs]
+ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbose=False):
+
+ try: self.sub
+ except: return _f('fatal', 'no subscriber initialized')
+ if job_n:
+ _f("info",
+ f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
+ try:
+ msgs = await self.sub.fetch(batch=job_n, timeout=60)
+ payloads = [msg.data if generic else Payload(
+ **json.loads(msg.data)) for msg in msgs]
+ try:
+ for payload, msg in zip(payloads, msgs):
+ await cb(payload, msg)
+ except ValueError as e:
+ _f('success', f"job of {job_n} fulfilled\n{e}")
+ except Exception as e:
+ _f('fatal', e)
+ except ValueError as e:
+ _f('warn',
+ f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}')
+ except Exception as e:
+ _f('warn', "no more data")
+ else:
+ _f("info",
+ f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')
+ while True:
+ try:
+ msgs = await self.sub.fetch(batch=1, timeout=60)
+ _f('info', f"{msgs}") if verbose else None
+ payload = msgs[0].data if generic else Payload(
+ **json.loads(msgs[0].data))
+ _f('info', f"{payload}") if verbose else None
+ try:
+ await cb(payload, msgs[0])
+ except Exception as e:
+ _f("warn", f'retrying connection to {self.magnet.config.host}\n{e}')
+ _f("info", "this can also be a problem with your callback")
+ except Exception as e:
+ _f('warn', f'no more data') if "nats: timeout" in str(e) else _f('fatal', e)
+ break
+
+
+
+[docs]
+ async def worker(self, cb=print):
+ """
+ Consume messages from a specific category in a stream and process them as jobs.
+
+ Args:
+ cb (function, optional): The callback function to process the received messages. Defaults to `print`.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If there is an error in consuming the message or processing the callback function.
+ """
+ _f("info",
+ f'processing jobs from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
+ try:
+ msg = await self.sub.next_msg(timeout=60)
+ payload = JobParams(**json.loads(msg.data))
+ try:
+ await cb(payload, msg)
+ except Exception as e:
+ _f("warn", f'something wrong in your callback function!\n{e}')
+ except Exception as e:
+ _f('fatal', 'invalid JSON')
+
+
+
+
+
+
+[docs]
+ async def info(self):
+ """
+ Retrieves information about a consumer in a JetStream stream.
+
+ :param session: A string representing the session name of the consumer. If not provided, information about all consumers in the stream will be retrieved.
+ :return: None
+ """
+ jsm = await self.magnet.js.consumer_info(stream=self.magnet.config.stream_name, consumer=self.magnet.session)
+ _f('info', json.dumps(jsm.config.__dict__, indent=2))
+
+
+
+[docs]
+ async def off(self):
+ """
+ Unsubscribes from the category and stream and disconnects from the NATS server.
+
+ :return: None
+ """
+ await self.magnet.js.sub.unsubscribe()
+ _f('warn', f'unsubscribed from {self.magnet.config.stream_name}')
+ await self.nc.drain()
+ _f('warn', f'safe to disconnect from {self.magnet.config.host}')
+
+
+
+
+from sentence_transformers import SentenceTransformer
+
+from magnet.utils.globals import _f
+from magnet.utils.globals import Utils
+from magnet.utils.index.milvus import *
+from magnet.utils.data_classes import EmbeddingPayload
+
+from magnet.ic.field import Charge, Magnet
+
+from typing import Optional
+
+
+[docs]
+class Memory:
+ """
+ The Embedder class is responsible for embedding text using a pre-trained sentence transformer model and storing or sending the embeddings for further processing. It utilizes the Milvus database for storing and searching the embeddings.
+
+ Args:
+ config (Config): A Config instance containing the configuration parameters for the Embedder class.
+ create (bool, optional): If set to True, a connection to the Milvus database will be created. Defaults to False.
+
+ Attributes:
+ config (Config): A Config instance containing the configuration parameters for the Embedder class.
+ model (SentenceTransformer): An instance of the SentenceTransformer class from the sentence_transformers library, used for text embedding.
+ db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database.
+ """
+
+ def __init__(self, magnet: Magnet = None):
+ self.config = magnet.config
+ self._model = None
+
+
+[docs]
+ async def on(self, create: bool = False, initialize: bool = False):
+ self._model = SentenceTransformer(self.config.index.model, device=Utils().check_cuda())
+ _f('info', f'loading into {self._model.device}')
+ self.db = MilvusDB(self.config)
+ await self.db.on()
+ if create:
+ await self.db.create(overwrite=True)
+ await self.db.load()
+ if initialize:
+ self.db.initialize()
+
+
+
+[docs]
+ async def index(self, payload, msg, field: Charge = None, v: bool = False, instruction: str = "Represent this sentence for searching relevant passages: "):
+ if not msg or not payload:
+ return _f('fatal', 'no field message and/or payload to ack!')
+ if field:
+ self.field = field
+ try:
+ _f('info', f'encoding payload\n{payload}') if v else None
+ payload.embedding = self._model.encode(
+ f"{instruction} {payload.text}", normalize_embeddings=True)
+ except Exception as e:
+ return _f('fatal', e)
+ await msg.in_progress()
+ try:
+ _f('info', f'indexing payload') if v else None
+ if not await self.is_dupe(q=payload.embedding):
+ self.db.collection.insert([
+ [payload.document], [payload.text], [payload.embedding]
+ ])
+ if field:
+ payload = EmbeddingPayload(
+ model=self.config.index.model,
+ embedding=self._model.encode(
+ f"{instruction} {payload.text}", normalize_embeddings=True).tolist(),
+ text=payload.text,
+ document=payload.document
+ )
+ _f('info', f'sending payload\n{payload}') if v else None
+ await self.field.pulse(payload)
+ await msg.ack_sync()
+ _f('success', f'embedding indexed\n{payload}') if v else None
+ else:
+ await msg.ack_sync()
+ _f('warn', f'embedding exists already\n{payload}') if v else None
+ except Exception as e:
+ await msg.term()
+ _f('fatal', e)
+
+
+
+[docs]
+ def search(self, payload, limit: int = 100, cb: Optional[callable] = None, instruction: str = "Represent this sentence for searching relevant passages: "):
+ payload = EmbeddingPayload(
+ text=payload,
+ embedding=self._model.encode(
+ f"{instruction} {payload}", normalize_embeddings=True),
+ model=self.config.index.model,
+ document="none"
+ )
+
+ _results = self.db.collection.search(
+ data=[payload.embedding],
+ anns_field="embedding",
+ param=self.config.index.options,
+ limit=limit,
+ output_fields=['text', 'document']
+ )
+ results = []
+ for hits_i, hits in enumerate(_results):
+ for hit in hits:
+ results.append({
+ 'text': hit.entity.get('text'),
+ 'document': hit.entity.get('document'),
+ 'distance': hit.distance
+ })
+ if cb:
+ return cb(payload.text, results)
+ else:
+ return results
+
+
+
+
+
+
+
+
+
+[docs]
+ async def delete(self, name: str = None):
+ if name and name == self.config.index.name:
+ try:
+ self.db.delete_index()
+ except Exception as e:
+ _f('fatal', e)
+ else:
+ _f('fatal', "name doesn't match the connection or the connection doesn't exist")
+
+
+
+[docs]
+ async def is_dupe(self, q: str = None):
+ match = self.db.collection.search(
+ data=[q],
+ anns_field="embedding",
+ param=self.config.index.options,
+ output_fields=['text', 'document'],
+ limit=1
+ )
+ return True if match and match[0] and match[0][0].distance >= 0.99 else False
+
+
+
+from dataclasses import dataclass, field
+from typing import Dict, Optional, Any
+
+from dataclasses import dataclass, field
+from typing import List, Optional, Callable
+
+
+[docs]
+@dataclass
+class AskParameters:
+ m: str = "mistralai/Mistral-7B-Instruct-v0.1"
+ q: str = "What is your itinerary?"
+ t: float = 1.0
+ n: int = 8096
+ p: str = "qa_ref"
+ cb: Optional[Callable] = None
+ docs: List[str] = field(default_factory=list)
+ vllm: bool = False
+
+
+
+[docs]
+@dataclass
+class IndexConfig:
+ milvus_uri: Optional[str] = None
+ milvus_port: Optional[int] = None
+ milvus_user: Optional[str] = None
+ milvus_password: Optional[str] = None
+ dimension: Optional[int] = None
+ model: Optional[str] = None
+ name: Optional[str] = None
+ options: Dict[Optional[dict], Any] = field(default_factory=dict)
+
+
+
+[docs]
+@dataclass
+class MagnetConfig:
+ host: str
+ domain: str = None
+ credentials: str = None
+ session: str = None
+ stream_name: str = None
+ category: str = None
+ kv_name: str = None
+ os_name: str = None
+ index: Optional[IndexConfig] = None
+
+
+
+[docs]
+@dataclass
+class Payload:
+ """
+ Represents a payload with two main fields: text and document.
+
+ Args:
+ text (str): The text associated with the payload.
+ document (str): The document associated with the payload.
+ """
+ text: str
+ document: str
+
+
+
+[docs]
+@dataclass
+class GeneratedPayload:
+ """
+ Represents a payload generated by a system.
+
+ Args:
+ query (str): The query associated with the payload.
+ prompt (str): The prompt associated with the payload.
+ context (list): The context associated with the payload.
+ result (str): The result generated by the system.
+ model (str): The model used to generate the payload.
+ """
+ query: str
+ prompt: str
+ context: list
+ result: str
+ model: str
+
+
+
+[docs]
+@dataclass
+class EmbeddingPayload:
+ """
+ Represents a payload for embedding text data.
+
+ Attributes:
+ document (str): The document associated with the text data.
+ embedding (list): The embedding of the text data.
+ text (list): The text of the data.
+ model (str): The model used for embedding the text data.
+ """
+ document: str
+ embedding: list
+ text: list
+ model: str
+
+
+
+[docs]
+@dataclass
+class MistralArgs:
+ """
+ Represents a set of arguments for the Mistral model.
+
+ Args:
+ dim (int): The dimensionality of the model.
+ n_layers (int): The number of layers in the model.
+ head_dim (int): The dimensionality of each attention head.
+ hidden_dim (int): The dimensionality of the hidden layer in the feed-forward network.
+ n_heads (int): The number of attention heads.
+ n_kv_heads (int): The number of attention heads used for key-value attention.
+ norm_eps (float): The epsilon value used for numerical stability in layer normalization.
+ vocab_size (int): The size of the vocabulary used in the model.
+ """
+
+ dim: int
+ n_layers: int
+ head_dim: int
+ hidden_dim: int
+ n_heads: int
+ n_kv_heads: int
+ norm_eps: float
+ vocab_size: int
+
+
+
+[docs]
+@dataclass
+class JobParams:
+ milvus_host: str
+ milvus_port: int
+ milvus_username: str
+ milvus_password: str
+ milvus_collection: str
+ nats_host: str
+ nats_username: str
+ nats_password: str
+ nats_stream: str
+ nats_category: str
+ job_type: str
+ job_n: int
+ embedding_model: str
+ generation_model: str
+
+
+import re
+import os
+import torch
+import random
+import boto3
+from spacy.lang.en import English
+import inspect
+
+
+
+
+
+
+def _f(
+ tag: str = None,
+ body: any = None,
+ no_print: bool = False,
+ luxe: bool = False
+):
+ """
+ The `_f` function is a logging utility that prints messages with different tags and colors based on
+ the provided parameters.
+
+ :param tag: The `tag` parameter is a string that represents the tag for the log message. It can be
+ one of the following values: "FATAL", "WARN", "INFO", "WAIT", or "SUCCESS"
+ :type tag: str
+ :param body: The `body` parameter is used to specify the message or content that you want to
+ display. It can be of any type
+ :type body: any
+ :param no_print: The `no_print` parameter is a boolean flag that determines whether the output
+ should be printed or returned as a string.
+ the formatted string without printing it. If `no_print` is set to `False` (default)
+ :type no_print: bool (optional)
+ :param luxe: The `luxe` parameter is a boolean flag that determines whether to use a more luxurious
+ and colorful output format. If `luxe` is set to `True`, the output will include random colors,
+ emojis, and matrix-like characters.
+ :type luxe: bool (optional)
+ :return: The function `_f` returns a formatted string if the `no_print` parameter is set to `True`.
+ If `no_print` is `False`, the function prints the formatted string and returns `None`.
+ """
+ tags = [
+ ("FATAL", "☠️", "\033[91m"), # Red color for FATAL
+ ("WARN", "🚨", "\033[93m"), # Yellow color for WARN
+ ("INFO", "ℹ️", "\033[94m"), # Blue color for INFO
+ ("WAIT", "☕️", "\033[96m"), # Cyan color for WAIT
+ ("SUCCESS", "🌊", "\033[92m"), # Green color for SUCCESS
+ ]
+ _luxe = [
+ "\033[31m",
+ "\033[32m",
+ "\033[33m",
+ "\033[34m",
+ "\033[35m",
+ "\033[36m",
+ "\033[91m",
+ "\033[92m",
+ "\033[93m",
+ "\033[94m",
+ "\033[95m",
+ "\033[96m",
+ ]
+ _matrix = ["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"]
+ _joy = [
+ "🍤",
+ "🌈",
+ "📊",
+ "🏁",
+ "🌊",
+ "🧠",
+ "✨",
+ "🧮",
+ "🎉",
+ "🥳",
+ "🤩",
+ "🐈",
+ "❤️",
+ "💙",
+ "💜",
+ "💚",
+ "💛",
+ "🧡",
+ "⭐️",
+ ]
+ matching_tags = [x for x in tags if x[0] == tag.upper()]
+ if matching_tags:
+ tag_text = matching_tags[0][0]
+ emoji = matching_tags[0][1]
+ color_code = matching_tags[0][2]
+ if luxe:
+ return (
+ f"{_luxe[random.randint(0,len(_luxe)-1)]} {_joy[random.randint(0,len(_joy)-1)]} {_matrix[random.randint(0,len(_matrix)-1)]}: {body}\033[0m"
+ if no_print
+ else print(
+ f"{_luxe[random.randint(0,len(_luxe)-1)]} {_joy[random.randint(0,len(_joy)-1)]} {_matrix[random.randint(0,len(_matrix)-1)]}: {body}\033[0m"
+ )
+ )
+ else:
+ return (
+ f"{color_code} {emoji} {tag_text}: {body}\033[0m"
+ if no_print
+ else print(f"{color_code}{emoji} {tag_text}: {body}\033[0m")
+ )
+ else:
+ print(f"😭 UNKNOWN TAG - `{tag}`")
+
+
+[docs]
+class Utils:
+ """
+ The `Utils` class provides various utility functions for tasks such as checking CUDA availability, normalizing text, and uploading files to Amazon S3.
+
+ Example Usage:
+ # Create an instance of the Utils class
+ utils = Utils()
+
+ # Check if CUDA is available
+ utils.check_cuda()
+
+ # Normalize a text string
+ normalized_text = utils.normalize_text(" This is a sample text. ")
+
+ # Upload a file to Amazon S3
+ utils.upload_to_s3(file_or_dir="path/to/file.txt", keys=("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY"), bucket="my-bucket", bucket_path="data")
+
+ Main functionalities:
+ - Checking if CUDA is available on the machine
+ - Normalizing text by performing various cleaning operations
+ - Uploading files to Amazon S3
+
+ Methods:
+ - __init__(): Initializes the Utils class and sets up the spaCy English language model with a sentence tokenizer.
+ - check_cuda(): Checks if CUDA is available on the machine and provides additional information if it is.
+ - normalize_text(_): Cleans a text string by removing whitespace, replacing characters, and removing curly braces.
+ - upload_to_s3(file_or_dir, keys, bucket, bucket_path): Uploads a file or directory to an Amazon S3 bucket using the provided access keys and bucket information.
+
+ Fields:
+ The Utils class does not have any fields.
+ """
+
+ def __init__(self):
+ nlp = English()
+ nlp.add_pipe("sentencizer")
+ self.nlp = nlp
+
+
+[docs]
+ def check_cuda(self):
+ """
+ The function checks if CUDA is available on the machine and provides additional information if
+ it is.
+ """
+ if torch.cuda.is_available():
+ _f("success", "CUDA is available on this machine.")
+ # You can also print additional information like the number of available GPUs:
+ _f("info",
+ f"Number of available GPUs - {torch.cuda.device_count()}")
+ # To get the name of the GPU:
+ _f(
+ "info", f"GPU Name - {torch.cuda.get_device_name(0)}"
+ ) # 0 is the GPU index
+ return 'cuda'
+ else:
+ _f("warn", "CUDA is not available on this machine.")
+ return 'cpu'
+
+
+
+[docs]
+ def normalize_text(self, _):
+ """
+ The `clean` function takes a string as input and performs various cleaning operations on it,
+ such as removing whitespace, replacing characters, and removing curly braces.
+
+ :param _: The parameter "_" is a placeholder for the input string that needs to be cleaned
+ :return: a cleaned version of the input string.
+ """
+ if isinstance(_, list):
+ _f(
+ "warn",
+ f"this item may not process properly because it is a list: \n{_}",
+ )
+ try:
+ if not isinstance(_, str):
+ _f('warn', f'non-string found {type(_)}')
+ _ = _.strip()
+ _ = _.replace('.', '') if (
+ _.count('.') / len(_)) * 100 > 0.3 else _
+
+ # Check if more than 20% of the string is integers
+ num_digits = sum(1 for char in _ if char.isdigit())
+ if (num_digits / len(_)) >= 0.2:
+ _ = "" # Replace with empty quotes
+ else:
+ _ = _.replace('"', "'")
+
+ # Check if the string has at least 5 words
+ words = _.split()
+ if len(words) < 5:
+ _ = "" # Replace with empty quotes if fewer than 5 words
+
+ _ = _.replace("\t", "")
+ _ = _.replace("\n", "")
+ _ = _.replace("\xa0", "")
+ _ = " ".join(_.split())
+ _ = re.sub(r" {[^}]*}", "", _)
+ return str(_)
+ except Exception as e:
+ _f("fatal", e)
+
+
+
+[docs]
+ def upload_to_s3(
+ self,
+ file_or_dir: str = None,
+ keys: tuple = ("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY"),
+ bucket: str = None,
+ bucket_path: str = None,
+ ):
+ """
+ Uploads a file or directory to an Amazon S3 bucket.
+
+ Args:
+ file_or_dir (str): The path of the file or directory to be uploaded.
+ keys (tuple): A tuple containing the access keys (access key ID and secret access key) required to access the Amazon S3 bucket. Default is ("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY").
+ bucket (str): The name of the Amazon S3 bucket where the file or directory will be uploaded.
+ bucket_path (str): The path within the bucket where the file or directory will be uploaded.
+
+ Returns:
+ None
+
+ Raises:
+ None
+
+ Example Usage:
+ # Create an instance of the Utils class
+ utils = Utils()
+
+ # Upload a file to Amazon S3
+ utils.upload_to_s3(file_or_dir="path/to/file.txt", keys=("YOUR_ACCESS_KEY", "YOUR_SECRET_KEY"), bucket="my-bucket", bucket_path="data")
+ """
+ aws_access_key_id, aws_secret_access_key = keys
+
+ s3 = boto3.client(
+ "s3",
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ )
+ _f("warn", f"uploading to S3 - {file_or_dir}")
+ if os.path.isfile(os.path.abspath(file_or_dir)):
+ s3.upload_file(
+ os.path.abspath(
+ file_or_dir), bucket, f'{bucket_path}/{file_or_dir.split("/")[-1]}'
+ )
+ _f("success",
+ f'uploaded - {bucket}/{bucket_path}/{file_or_dir.split("/")[-1]}')
+ elif os.path.isdir(os.path.abspath(file_or_dir)):
+ for filename in os.listdir(file_or_dir):
+ f = os.path.join(file_or_dir, filename)
+ s3.upload_file(
+ os.path.abspath(
+ f), bucket, f'{bucket_path}/{f.split("/")[-1]}'
+ )
+ _f("success",
+ f'uploaded - {bucket}/{bucket_path}/{f.split("/")[-1]}')
+
+
+
' + + '' + + _("Hide Search Matches") + + "
" + ) + ); + }, + + /** + * helper function to hide the search marks again + */ + hideSearchWords: () => { + document + .querySelectorAll("#searchbox .highlight-link") + .forEach((el) => el.remove()); + document + .querySelectorAll("span.highlighted") + .forEach((el) => el.classList.remove("highlighted")); + localStorage.removeItem("sphinx_highlight_terms") + }, + + initEscapeListener: () => { + // only install a listener if it is really needed + if (!DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS) return; + + document.addEventListener("keydown", (event) => { + // bail for input elements + if (BLACKLISTED_KEY_CONTROL_ELEMENTS.has(document.activeElement.tagName)) return; + // bail with special keys + if (event.shiftKey || event.altKey || event.ctrlKey || event.metaKey) return; + if (DOCUMENTATION_OPTIONS.ENABLE_SEARCH_SHORTCUTS && (event.key === "Escape")) { + SphinxHighlight.hideSearchWords(); + event.preventDefault(); + } + }); + }, +}; + +_ready(() => { + /* Do not call highlightSearchWords() when we are on the search page. + * It will highlight words from the *previous* search query. + */ + if (typeof Search === "undefined") SphinxHighlight.highlightSearchWords(); + SphinxHighlight.initEscapeListener(); +}); diff --git a/genindex.html b/genindex.html new file mode 100644 index 0000000..218fd9e --- /dev/null +++ b/genindex.html @@ -0,0 +1,697 @@ + + + + + ++ |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ | + |
+ |
don’t forget to check out the helpful 💻 examples
+the small distributed language model toolkit
+ +⚡️ fine-tune state-of-the-art LLMs anywhere, rapidly ⚡️
+pip install llm-magnet
+
or
+python3 setup.py install
+
⚡️ It’s Fast
+fast on consumer hardware
very fast on Apple Silicon
extremely fast on ROCm/CUDA
🫵 Automatic or your way
+rely on established transformer patterns to let magnet
do the
+work
keep your existing data processing functions, bring them to
+magnet
!
🛰️ 100% Distributed
+processing, embedding, storage, retrieval, querying, or inference +from anywhere
as much or as little compute as you need
🧮 Choose Inference Method
+HuggingFace
vLLM node
GPU
mlx
🌎 Huge Volumes
+handle gigantic amounts of data inexpensively
fault-tolerant by design
decentralized workloads
🔐 Secure
+JWT
Basic
🪵 World-Class Comprehension
+magnet
optionally logs its own code as it’s executed (yes,
+really)
build a self-aware system and allow it to learn from itself
emojis are the future
learn more about how Prismadic uses 🧲
+subscribe to our substack
++
AskParameters
+EmbeddingPayload
+GeneratedPayload
+IndexConfig
+JobParams
JobParams.embedding_model
JobParams.generation_model
JobParams.job_n
JobParams.job_type
JobParams.milvus_collection
JobParams.milvus_host
JobParams.milvus_password
JobParams.milvus_port
JobParams.milvus_username
JobParams.nats_category
JobParams.nats_host
JobParams.nats_password
JobParams.nats_stream
JobParams.nats_username
MagnetConfig
+MistralArgs
+Payload
+Bases: object
Deletes the specified stream if the name matches the current stream, or prints an error message if the name doesn’t match or the stream doesn’t exist.
+name (str, optional): The name of the stream to delete. Defaults to None.
+Publishes data to the NATS server using the specified category and payload.
+job (dict, optional): The data to be published. Defaults to {}.
+Publishes data to the NATS server using the specified category and payload.
+payload (dict): The data to be published.
+Bases: object
Retrieves information about a consumer in a JetStream stream.
+session – A string representing the session name of the consumer. If not provided, information about all consumers in the stream will be retrieved.
+None
+Unsubscribes from the category and stream and disconnects from the NATS server.
+None
+Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
+category (str, optional): The category to subscribe to. Defaults to ‘no_category’. +stream (str, optional): The stream to subscribe to. Defaults to ‘documents’. +session (str, optional): The session name for durable subscriptions. Defaults to ‘magnet’.
+None
+TimeoutError: If there is a timeout error while connecting to the NATS server. +Exception: If there is an error in consuming the message or processing the callback function.
+Consume messages from a specific category in a stream and process them as jobs.
+cb (function, optional): The callback function to process the received messages. Defaults to print.
+None
+Exception: If there is an error in consuming the message or processing the callback function.
+Bases: object
The Embedder class is responsible for embedding text using a pre-trained sentence transformer model and storing or sending the embeddings for further processing. It utilizes the Milvus database for storing and searching the embeddings.
+config (Config): A Config instance containing the configuration parameters for the Embedder class. +create (bool, optional): If set to True, a connection to the Milvus database will be created. Defaults to False.
+config (Config): A Config instance containing the configuration parameters for the Embedder class. +model (SentenceTransformer): An instance of the SentenceTransformer class from the sentence_transformers library, used for text embedding. +db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database.
+Bases: object
Bases: object
Represents a payload for embedding text data.
+document (str): The document associated with the text data. +embedding (list): The embedding of the text data. +text (list): The text of the data. +model (str): The model used for embedding the text data.
+Bases: object
Represents a payload generated by a system.
+query (str): The query associated with the payload. +prompt (str): The prompt associated with the payload. +context (list): The context associated with the payload. +result (str): The result generated by the system. +model (str): The model used to generate the payload.
+Bases: object
Bases: object
Bases: object
Bases: object
Represents a set of arguments for the Mistral model.
+dim (int): The dimensionality of the model. +n_layers (int): The number of layers in the model. +head_dim (int): The dimensionality of each attention head. +hidden_dim (int): The dimensionality of the hidden layer in the feed-forward network. +n_heads (int): The number of attention heads. +n_kv_heads (int): The number of attention heads used for key-value attention. +norm_eps (float): The epsilon value used for numerical stability in layer normalization. +vocab_size (int): The size of the vocabulary used in the model.
+Bases: object
Represents a payload with two main fields: text and document.
+text (str): The text associated with the payload. +document (str): The document associated with the payload.
+Bases: object
The Utils class provides various utility functions for tasks such as checking CUDA availability, normalizing text, and uploading files to Amazon S3.
+# Create an instance of the Utils class +utils = Utils()
+# Check if CUDA is available +utils.check_cuda()
+# Normalize a text string +normalized_text = utils.normalize_text(” This is a sample text. “)
+# Upload a file to Amazon S3 +utils.upload_to_s3(file_or_dir=”path/to/file.txt”, keys=(“YOUR_ACCESS_KEY”, “YOUR_SECRET_KEY”), bucket=”my-bucket”, bucket_path=”data”)
+Main functionalities: +- Checking if CUDA is available on the machine +- Normalizing text by performing various cleaning operations +- Uploading files to Amazon S3
+Methods: +- __init__(): Initializes the Utils class and sets up the spaCy English language model with a sentence tokenizer. +- check_cuda(): Checks if CUDA is available on the machine and provides additional information if it is. +- normalize_text(_): Cleans a text string by removing whitespace, replacing characters, and removing curly braces. +- upload_to_s3(file_or_dir, keys, bucket, bucket_path): Uploads a file or directory to an Amazon S3 bucket using the provided access keys and bucket information.
+Fields: +The Utils class does not have any fields.
+The function checks if CUDA is available on the machine and provides additional information if +it is.
+The clean function takes a string as input and performs various cleaning operations on it, +such as removing whitespace, replacing characters, and removing curly braces.
+_ – The parameter “_” is a placeholder for the input string that needs to be cleaned
+a cleaned version of the input string.
+Uploads a file or directory to an Amazon S3 bucket.
+file_or_dir (str): The path of the file or directory to be uploaded. +keys (tuple): A tuple containing the access keys (access key ID and secret access key) required to access the Amazon S3 bucket. Default is (“YOUR_ACCESS_KEY”, “YOUR_SECRET_KEY”). +bucket (str): The name of the Amazon S3 bucket where the file or directory will be uploaded. +bucket_path (str): The path within the bucket where the file or directory will be uploaded.
+None
+None
+# Create an instance of the Utils class +utils = Utils()
+# Upload a file to Amazon S3 +utils.upload_to_s3(file_or_dir=”path/to/file.txt”, keys=(“YOUR_ACCESS_KEY”, “YOUR_SECRET_KEY”), bucket=”my-bucket”, bucket_path=”data”)
+AskParameters
+EmbeddingPayload
+GeneratedPayload
+IndexConfig
+JobParams
JobParams.embedding_model
JobParams.generation_model
JobParams.job_n
JobParams.job_type
JobParams.milvus_collection
JobParams.milvus_host
JobParams.milvus_password
JobParams.milvus_port
JobParams.milvus_username
JobParams.nats_category
JobParams.nats_host
JobParams.nats_password
JobParams.nats_stream
JobParams.nats_username
MagnetConfig
+MistralArgs
+Payload
++ m | ||
+ |
+ magnet | + |
+ |
+ magnet.ic.field | + |
+ |
+ magnet.ize.memory | + |
+ |
+ magnet.utils.data_classes | + |
+ |
+ magnet.utils.globals | + |