From d36ba44599606e7bdbdb3806161253638b7252c9 Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 15 Mar 2024 08:31:57 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20fix(wip):=20obviously?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/0_embedded_cluster.ipynb | 6 +- examples/{0_prism.ipynb => 0_magnet.ipynb} | 18 ++-- magnet/base.py | 10 +- magnet/ic/field.py | 108 ++++++++++----------- magnet/ize/memory.py | 6 +- tests/test_Prism.py | 28 +++--- 6 files changed, 88 insertions(+), 88 deletions(-) rename examples/{0_prism.ipynb => 0_magnet.ipynb} (97%) diff --git a/examples/0_embedded_cluster.ipynb b/examples/0_embedded_cluster.ipynb index eb60760..382c819 100644 --- a/examples/0_embedded_cluster.ipynb +++ b/examples/0_embedded_cluster.ipynb @@ -57,7 +57,7 @@ "\u001b[93m 🧠 ⣻: Server is ready\n", "\u001b[0m\n", "\u001b[92m🌊 SUCCESS: milvus server started\u001b[0m\n", - "\u001b[96m☕️ WAIT: creating prism with embedded cluster\u001b[0m\n", + "\u001b[96m☕️ WAIT: creating magnet with embedded cluster\u001b[0m\n", "\u001b[93m🚨 WARN: Stream my_stream not found, creating\u001b[0m\n", "\u001b[92m🌊 SUCCESS: created `my_stream` with default category `magnet`\u001b[0m\n", "\u001b[93m🚨 WARN: KV bucket my_kv not found, creating\u001b[0m\n", @@ -80,8 +80,8 @@ "from magnet.base import EmbeddedMagnet\n", "cluster = EmbeddedMagnet()\n", "cluster.start()\n", - "prism = cluster.create_prism()\n", - "await prism.align()\n", + "magnet = cluster.create_magnet()\n", + "await magnet.align()\n", "cluster.stop()\n", "cluster.cleanup()" ] diff --git a/examples/0_prism.ipynb b/examples/0_magnet.ipynb similarity index 97% rename from examples/0_prism.ipynb rename to examples/0_magnet.ipynb index f61f2b5..345303b 100644 --- a/examples/0_prism.ipynb +++ b/examples/0_magnet.ipynb @@ -57,7 +57,7 @@ "\u001b[34m 💜 ⣽: Server is ready\n", "\u001b[0m\n", "\u001b[92m🌊 SUCCESS: milvus server started\u001b[0m\n", - "\u001b[96m☕️ WAIT: creating prism with embedded cluster\u001b[0m\n", + "\u001b[96m☕️ WAIT: creating magnet with embedded cluster\u001b[0m\n", "\u001b[93m🚨 WARN: Stream my_stream not found, creating\u001b[0m\n", "\u001b[92m🌊 SUCCESS: created `my_stream` with default category `magnet`\u001b[0m\n", "\u001b[93m🚨 WARN: KV bucket my_kv not found, creating\u001b[0m\n", @@ -84,13 +84,13 @@ } ], "source": [ - "from magnet.base import Prism\n", + "from magnet.base import Magnet\n", "from magnet.base import EmbeddedMagnet\n", "\n", "cluster = EmbeddedMagnet()\n", "cluster.start()\n", - "prism = cluster.create_prism()\n", - "await prism.align()\n", + "magnet = cluster.create_magnet()\n", + "await magnet.align()\n", "\n", "config = {\n", " \"host\": \"127.0.0.1\",\n", @@ -120,8 +120,8 @@ " }\n", "}\n", "\n", - "prism = Prism(config)\n", - "await prism.align()" + "magnet = Magnet(config)\n", + "await magnet.align()" ] }, { @@ -143,7 +143,7 @@ "source": [ "from magnet.ic.field import Charge\n", "\n", - "field = Charge(prism)\n", + "field = Charge(magnet)\n", "await field.on()" ] }, @@ -229,7 +229,7 @@ ], "source": [ "from magnet.ize.memory import Memory\n", - "mem = Memory(prism)\n", + "mem = Memory(magnet)\n", "await mem.on(create=True)\n" ] }, @@ -271,7 +271,7 @@ "\n", "from magnet.ic.field import Resonator\n", "\n", - "reso = Resonator(prism)\n", + "reso = Resonator(magnet)\n", "\n", "async def handle_payload(payload, msg):\n", " await mem.index(payload, msg, v=True)\n", diff --git a/magnet/base.py b/magnet/base.py index da1561e..25bc22f 100644 --- a/magnet/base.py +++ b/magnet/base.py @@ -37,7 +37,7 @@ } } -class Prism: +class Magnet: def __init__(self, config: PrismConfig | dict = None): try: if isinstance(config, dict): @@ -198,10 +198,10 @@ def stop(self): except Exception as e: _f("warn", f"embedded milvus can't be stopped\n{e}") - def create_prism(self): - _f('wait', 'creating prism with embedded cluster') - prism = Prism(auto_config) - return prism + def create_magnet(self): + _f('wait', 'creating magnet with embedded cluster') + magnet = Magnet(auto_config) + return magnet def cleanup(self): self.client.images.prune() diff --git a/magnet/ic/field.py b/magnet/ic/field.py index 3a6f7ee..12dec26 100644 --- a/magnet/ic/field.py +++ b/magnet/ic/field.py @@ -3,7 +3,7 @@ from dataclasses import asdict from tabulate import tabulate -from magnet.base import Prism +from magnet.base import Magnet from magnet.utils.globals import _f from magnet.utils.data_classes import * @@ -17,12 +17,12 @@ utc_timestamp = utc_time.timestamp() class Charge: - def __init__(self, prism: Prism): - self.prism = prism + def __init__(self, magnet: Magnet): + self.magnet = magnet async def list_streams(self): try: - streams = await self.prism.js.streams_info() + streams = await self.magnet.js.streams_info() remote_streams = [x.config.name for x in streams] remote_subjects = [x.config.subjects for x in streams] data = zip(remote_streams, remote_subjects) @@ -32,13 +32,13 @@ async def list_streams(self): # Loop through each stream and its subjects for stream, subjects in zip(remote_streams, remote_subjects): # Check if the current stream name or any of the subjects match the config variables - match_stream_name = stream == self.prism.config.stream_name - match_subject = self.prism.config.category in subjects + match_stream_name = stream == self.magnet.config.stream_name + match_subject = self.magnet.config.category in subjects # If there's a match, format the stream name and subjects with ANSI green color and a magnet emoji if match_stream_name or match_subject: formatted_stream = f"\033[92m{stream} \U0001F9F2\033[0m" # Green and magnet emoji for stream name - formatted_subjects = [f"\033[92m{subject} \U0001F9F2\033[0m" if subject == self.prism.config.category else subject for subject in subjects] + formatted_subjects = [f"\033[92m{subject} \U0001F9F2\033[0m" if subject == self.magnet.config.category else subject for subject in subjects] else: formatted_stream = stream formatted_subjects = subjects @@ -51,41 +51,41 @@ async def list_streams(self): _f("info", f'\n{table}') except TimeoutError: - return _f('fatal', f'could not connect to {self.prism.config.host}') + return _f('fatal', f'could not connect to {self.magnet.config.host}') except Exception as e: return _f('fatal', e) async def on(self): try: - streams = await self.prism.js.streams_info() + streams = await self.magnet.js.streams_info() remote_streams = [x.config.name for x in streams] remote_subjects = [x.config.subjects for x in streams] - if self.prism.config.stream_name not in remote_streams: - return _f('fatal', f'{self.prism.config.stream_name} not found, initialize with `Prism.align()` first') - elif self.prism.config.category not in sum(remote_subjects, []): - if self.prism.config.category not in sum([x.config.subjects for x in streams if x.config.name == self.prism.config.stream_name], []): + if self.magnet.config.stream_name not in remote_streams: + return _f('fatal', f'{self.magnet.config.stream_name} not found, initialize with `Magnet.align()` first') + elif self.magnet.config.category not in sum(remote_subjects, []): + if self.magnet.config.category not in sum([x.config.subjects for x in streams if x.config.name == self.magnet.config.stream_name], []): try: subjects = sum( - [x.config.subjects for x in streams if x.config.name == self.prism.config.stream_name], []) - subjects.append(self.prism.config.category) - await self.prism.js.update_stream(StreamConfig( - name=self.prism.config.stream_name + [x.config.subjects for x in streams if x.config.name == self.magnet.config.stream_name], []) + subjects.append(self.magnet.config.category) + await self.magnet.js.update_stream(StreamConfig( + name=self.magnet.config.stream_name , subjects=subjects )) - _f("success", f'created [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}') + _f("success", f'created [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}') except ServerError as e: - _f('fatal', f"couldn't create {self.prism.config.stream_name} on {self.prism.config.host}, ensure your `category` is set") + _f('fatal', f"couldn't create {self.magnet.config.stream_name} on {self.magnet.config.host}, ensure your `category` is set") except TimeoutError: - return _f('fatal', f'could not connect to {self.prism.config.host}') - _f("success", f'ready [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}') + return _f('fatal', f'could not connect to {self.magnet.config.host}') + _f("success", f'ready [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}') async def off(self): """ Disconnects from the NATS server and prints a warning message. """ - await self.prism.nc.drain() - await self.prism.nc.close() - _f('warn', f'disconnected from {self.prism.config.host}') + await self.magnet.nc.drain() + await self.magnet.nc.close() + _f('warn', f'disconnected from {self.magnet.config.host}') async def pulse(self, payload: Payload | GeneratedPayload | EmbeddingPayload | JobParams = None, v=False): """ @@ -101,17 +101,17 @@ async def pulse(self, payload: Payload | GeneratedPayload | EmbeddingPayload | J return _f('fatal', f'invalid object, more info:\n{e} in [Payload, GeneratedPayload, EmbeddingPayload, JobParams]') try: _hash = x.xxh64(bytes_).hexdigest() - msg = await self.prism.js.publish( - self.prism.config.category, bytes_, headers={ + msg = await self.magnet.js.publish( + self.magnet.config.category, bytes_, headers={ "Nats-Msg-Id": _hash } ) - _f('success', f'pulsed to {self.prism.config.category} on {self.prism.config.stream_name}') if v else None + _f('success', f'pulsed to {self.magnet.config.category} on {self.magnet.config.stream_name}') if v else None _ts = datetime.datetime.now(datetime.timezone.utc) msg.ts = _ts return msg except Exception as e: - return _f('fatal', f'could not pulse data to {self.prism.config.host}\n{e}') + return _f('fatal', f'could not pulse data to {self.magnet.config.host}\n{e}') async def excite(self, job: dict = {}): """ @@ -126,13 +126,13 @@ async def excite(self, job: dict = {}): _f('fatal', f'invalid JSON\n{e}') try: _hash = x.xxh64(bytes_).hexdigest() - await self.prism.js.publish( - self.prism.config.category, bytes_, headers={ + await self.magnet.js.publish( + self.magnet.config.category, bytes_, headers={ "Nats-Msg-Id": _hash } ) except Exception as e: - _f('fatal', f'could not send data to {self.prism.config.host}\n{e}') + _f('fatal', f'could not send data to {self.magnet.config.host}\n{e}') async def emp(self, name=None): """ @@ -141,9 +141,9 @@ async def emp(self, name=None): Args: name (str, optional): The name of the stream to delete. Defaults to None. """ - if name and name == self.prism.config.stream_name: - await self.prism.js.delete_stream(name=self.prism.config.stream_name) - _f('warn', f'{self.prism.config.stream_name} stream deleted') + if name and name == self.magnet.config.stream_name: + await self.magnet.js.delete_stream(name=self.magnet.config.stream_name) + _f('warn', f'{self.magnet.config.stream_name} stream deleted') else: _f('fatal', "name doesn't match the stream or stream doesn't exist") @@ -154,21 +154,21 @@ async def reset(self, name=None): Args: name (str, optional): The name of the category to purge. Defaults to None. """ - if name and name == self.prism.config.category: - await self.js.purge_stream(name=self.prism.config.stream_name, subject=self.prism.config.category) - _f('warn', f'{self.prism.config.category} category deleted') + if name and name == self.magnet.config.category: + await self.js.purge_stream(name=self.magnet.config.stream_name, subject=self.magnet.config.category) + _f('warn', f'{self.magnet.config.category} category deleted') else: _f('fatal', "name doesn't match the stream category or category doesn't exist") class Resonator: - def __init__(self, prism: Prism): + def __init__(self, magnet: Magnet): """ Initializes the `Resonator` class with the NATS server address. Args: server (str): The address of the NATS server. """ - self.prism = prism + self.magnet = magnet async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000): """ @@ -193,16 +193,16 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000) , max_ack_pending=bandwidth , ack_wait=3600 ) - _f('wait', f'connecting to {self.prism.config.host}') + _f('wait', f'connecting to {self.magnet.config.host}') try: - self.sub = await self.prism.js.pull_subscribe( - durable=self.prism.config.session - , subject=self.prism.config.category - , stream=self.prism.config.stream_name + self.sub = await self.magnet.js.pull_subscribe( + durable=self.magnet.config.session + , subject=self.magnet.config.category + , stream=self.magnet.config.stream_name , config=self.consumer_config ) _f('info', - f'joined worker queue: {self.prism.config.session} as {self.node}') + f'joined worker queue: {self.magnet.config.session} as {self.node}') except Exception as e: return _f('fatal', e) @@ -212,7 +212,7 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo except: return _f('fatal', 'no subscriber initialized') if job_n: _f("info", - f'consuming {job_n} from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}\n🧲 session: "{self.prism.session}"') + f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"') try: msgs = await self.sub.fetch(batch=job_n, timeout=60) payloads = [msg.data if generic else Payload( @@ -226,12 +226,12 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo _f('fatal', e) except ValueError as e: _f('warn', - f'{self.prism.config.session} reached the end of {self.prism.config.category}, {self.prism.config.name}') + f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}') except Exception as e: _f('warn', "no more data") else: _f("info", - f'consuming delta from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}\n🧲 session: "{self.prism.config.session}"') + f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"') while True: try: msgs = await self.sub.fetch(batch=1, timeout=60) @@ -242,7 +242,7 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo try: await cb(payload, msgs[0]) except Exception as e: - _f("warn", f'retrying connection to {self.prism.config.host}\n{e}') + _f("warn", f'retrying connection to {self.magnet.config.host}\n{e}') _f("info", "this can also be a problem with your callback") except Exception as e: _f('warn', f'no more data') if "nats: timeout" in str(e) else _f('fatal', e) @@ -262,7 +262,7 @@ async def worker(self, cb=print): Exception: If there is an error in consuming the message or processing the callback function. """ _f("info", - f'processing jobs from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.stream_name}\n🧲 session: "{self.prism.session}"') + f'processing jobs from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"') try: msg = await self.sub.next_msg(timeout=60) payload = JobParams(**json.loads(msg.data)) @@ -283,7 +283,7 @@ async def info(self): :param session: A string representing the session name of the consumer. If not provided, information about all consumers in the stream will be retrieved. :return: None """ - jsm = await self.prism.js.consumer_info(stream=self.prism.config.stream_name, consumer=self.prism.session) + jsm = await self.magnet.js.consumer_info(stream=self.magnet.config.stream_name, consumer=self.magnet.session) _f('info', json.dumps(jsm.config.__dict__, indent=2)) async def off(self): @@ -292,8 +292,8 @@ async def off(self): :return: None """ - await self.prism.js.sub.unsubscribe() - _f('warn', f'unsubscribed from {self.prism.config.stream_name}') + await self.magnet.js.sub.unsubscribe() + _f('warn', f'unsubscribed from {self.magnet.config.stream_name}') await self.nc.drain() - _f('warn', f'safe to disconnect from {self.prism.config.host}') + _f('warn', f'safe to disconnect from {self.magnet.config.host}') diff --git a/magnet/ize/memory.py b/magnet/ize/memory.py index 2478126..6e097b8 100644 --- a/magnet/ize/memory.py +++ b/magnet/ize/memory.py @@ -5,7 +5,7 @@ from magnet.utils.index.milvus import * from magnet.utils.data_classes import EmbeddingPayload -from magnet.ic.field import Charge, Prism +from magnet.ic.field import Charge, Magnet from typing import Optional @@ -23,8 +23,8 @@ class Memory: db (MilvusDB): An instance of the MilvusDB class from the magnet.utils.milvus module, used for connecting to the Milvus database. """ - def __init__(self, prism: Prism = None): - self.config = prism.config + def __init__(self, magnet: Magnet = None): + self.config = magnet.config self._model = None async def on(self, create: bool = False, initialize: bool = False): diff --git a/tests/test_Prism.py b/tests/test_Prism.py index cb3a637..3bef1a7 100644 --- a/tests/test_Prism.py +++ b/tests/test_Prism.py @@ -1,7 +1,7 @@ import pytest -from magnet.ic.field import Prism +from magnet.ic.field import Magnet -# Prism can be initialized with a valid PrismConfig instance or a dictionary. +# Magnet can be initialized with a valid PrismConfig instance or a dictionary. @pytest.mark.asyncio async def test_valid_initialization(): config = { @@ -13,15 +13,15 @@ async def test_valid_initialization(): "os_name": "my_object_store" } - prism = Prism(config) - assert isinstance(prism, Prism) + magnet = Magnet(config) + assert isinstance(magnet, Magnet) @pytest.mark.asyncio async def test_invalid_initialization(): config = "invalid_config" with pytest.raises(ValueError): - Prism(config) + Magnet(config) @pytest.mark.asyncio async def test_prism_connect_and_setup(): @@ -34,10 +34,10 @@ async def test_prism_connect_and_setup(): "os_name": "my_object_store" } - prism = Prism(config) - assert isinstance(prism, Prism) + magnet = Magnet(config) + assert isinstance(magnet, Magnet) - js, kv, os = await prism.align() + js, kv, os = await magnet.align() assert js is not None assert kv is not None assert os is not None @@ -53,11 +53,11 @@ async def test_close_connection(): "os_name": "my_object_store" } - prism = Prism(config) - await prism.align() + magnet = Magnet(config) + await magnet.align() - assert prism.kv is not None - assert prism.os is not None + assert magnet.kv is not None + assert magnet.os is not None - await prism.off() - assert prism.nc.is_closed \ No newline at end of file + await magnet.off() + assert magnet.nc.is_closed \ No newline at end of file