From 57cfa34f8e9c9679f1d2623b27d6d484d34fa220 Mon Sep 17 00:00:00 2001 From: DylanAlloy Date: Wed, 10 Jul 2024 15:02:26 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20fix(wip):=20stream=20management?= =?UTF-8?q?=20on=20indexing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/conf.py | 4 ++-- magnet/ic/field.py | 13 ++++++------- magnet/ize/memory.py | 4 ++-- pyproject.toml | 2 +- setup.py | 2 +- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 9fc9280..1b4e82c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -11,7 +11,7 @@ project = 'magnet' copyright = '2023, Prismadic, LLC' author = 'Prismadic, LLC.' -release = '0.3.1' +release = '0.3.11' # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration @@ -29,7 +29,7 @@ display_github = True html_logo = "../magnet.png" pygments_style = 'dracula' -version = "v0.3.1" +version = "v0.3.11" release = "latest" # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output diff --git a/magnet/ic/field.py b/magnet/ic/field.py index d586820..2fbe278 100644 --- a/magnet/ic/field.py +++ b/magnet/ic/field.py @@ -212,7 +212,7 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000, , max_ack_pending=bandwidth , ack_wait=3600 ) - _f('wait', f'connecting to {self.magnet.config.host}') + _f('wait', f'connecting to {self.magnet.config.host.split("@")[1]}') try: if obj: self.sub = await self.magnet.os.watch(include_history=False) @@ -222,7 +222,6 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000, self.sub = await self.magnet.js.pull_subscribe( durable=self.magnet.config.session , subject=self.magnet.config.category - , stream=self.magnet.config.stream_name , config=self.consumer_config ) _f('info', @@ -271,13 +270,13 @@ async def deliver_messages(msgs): await self.download(msg) await cb(self.magnet.os, msg) else: - _f("info", f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"') + _f("info", f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"') msgs = await self.sub.fetch(batch=job_n, timeout=60) await deliver_messages(msgs) except ValueError as e: _f('warn', f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}') except Exception as e: - _f('warn', "no more data") + _f('warn', f"no more data\n{e}") else: if type(self.sub).__name__ == "ObjectWatcher": _f("info", f'consuming objects from [{self.magnet.config.host.split("@")[1]}] from\n🛰️ bucket: {self.magnet.config.stream_name}"') @@ -300,7 +299,7 @@ async def deliver_messages(msgs): _f('warn', 'encountered a timeout, retrying in 1s') else: _f('fatal', str(e)) - _f("warn", f'retrying connection to {self.magnet.config.host}\n{e}') + _f("warn", f'retrying connection to {self.magnet.config.host.split("@")[1]}\n{e}') _f("info", "this can also be a problem with your callback") await asyncio.sleep(1) _f("info", f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"') @@ -350,8 +349,8 @@ async def off(self): :return: None """ - await self.magnet.js.sub.unsubscribe() + await self.sub.unsubscribe() _f('warn', f'unsubscribed from {self.magnet.config.stream_name}') await self.nc.drain() - _f('warn', f'safe to disconnect from {self.magnet.config.host}') + _f('warn', f'safe to disconnect from {self.magnet.config.host.split("@")[1]}') diff --git a/magnet/ize/memory.py b/magnet/ize/memory.py index 35b2102..0a73314 100644 --- a/magnet/ize/memory.py +++ b/magnet/ize/memory.py @@ -56,7 +56,7 @@ async def index(self, payload, msg, field=None, v=False, instruction="Represent self.db.collection.insert([ [payload.document], [_chunk], [embedding.tolist()] ]) - _f('success', f'embedding indexed\n{payload}') if v else None + _f('success', f'embedding indexed\n{_chunk}') if v else None if field: payload = EmbeddingPayload( model=self.config.index.model, @@ -64,7 +64,7 @@ async def index(self, payload, msg, field=None, v=False, instruction="Represent text=_chunk, document=payload.document ) - _f('info', f'sending payload\n{payload}') if v else None + _f('info', f'sending payload\n{_chunk}') if v else None await self.field.pulse(payload) else: embedding = self._model.encode(text_to_encode, normalize_embeddings=True) diff --git a/pyproject.toml b/pyproject.toml index c3a77f5..d46882b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "llm_magnet" -version = "0.3.1" +version = "0.3.11" description = "the small distributed language model toolkit. fine-tune state-of-the-art LLMs anywhere, rapidly." readme = "dynamic" diff --git a/setup.py b/setup.py index 083f186..8976242 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='llm_magnet', - version='0.3.1', + version='0.3.11', description="the small distributed language model toolkit. fine-tune state-of-the-art LLMs anywhere, rapidly.", long_description=open('README.md').read(), long_description_content_type='text/markdown',