Skip to content

Commit

Permalink
🚧 fix(wip): object store job_n
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Jul 8, 2024
1 parent ef0199e commit b03e2ae
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 31 deletions.
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
project = 'magnet'
copyright = '2023, Prismadic, LLC'
author = 'Prismadic, LLC.'
release = '0.2.9'
release = '0.3.0'

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
Expand All @@ -29,7 +29,7 @@
display_github = True
html_logo = "../magnet.png"
pygments_style = 'dracula'
version = "v0.2.9"
version = "v0.3.0"
release = "latest"
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
Expand Down
61 changes: 34 additions & 27 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,57 +244,64 @@ async def download(self, obj: object = None):
_f('fatal', 'no object store initialized')

async def listen(self, cb=print, job_n: int = None, generic: bool = False, v=False):
try: self.sub
except: return _f('fatal', 'no subscriber initialized')
if job_n:
_f("info",
f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
try:
msgs = await self.sub.fetch(batch=job_n, timeout=60)
payloads = [msg.data if generic else Payload(
**json.loads(msg.data)) for msg in msgs]
try:
self.sub
except AttributeError:
return _f('fatal', 'no subscriber initialized')

async def deliver_messages(msgs):
payloads = [msg.data if generic else Payload(**json.loads(msg.data)) for msg in msgs]
for payload, msg in zip(payloads, msgs):
try:
for payload, msg in zip(payloads, msgs):
await cb(payload, msg)
await cb(payload, msg)
except ValueError as e:
_f('success', f"job of {job_n} fulfilled\n{e}")
except Exception as e:
_f('fatal', e)

if job_n:
try:
if type(self.sub).__name__ == "ObjectWatcher":
_f("info", f'consuming objects from [{self.magnet.config.host.split("@")[1]}] from\n🛰️ bucket: {self.magnet.config.os_name}"')
msgs = await self.object_store.list()
for msg in msgs:
await self.download(msg)
await cb(self.object_store, msg)
else:
_f("info", f'consuming {job_n} from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.session}"')
msgs = await self.sub.fetch(batch=job_n, timeout=60)
await deliver_messages(msgs)
except ValueError as e:
_f('warn',
f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}')
_f('warn', f'{self.magnet.config.session} reached the end of {self.magnet.config.category}, {self.magnet.config.name}')
except Exception as e:
_f('warn', "no more data")
else:
if type(self.sub).__name__ == "ObjectWatcher":
_f("info", f'consuming objects from [{self.magnet.config.host.split("@")[1]}] from\n🛰️ bucket: {self.magnet.config.stream_name}"')
e = await self.sub.updates()
loop = asyncio.get_event_loop()
loop.create_task(cb(self.object_store, e))
_f("info",
f'consuming objects from [{self.magnet.config.os_name}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')
await asyncio.sleep(1)

else:
while True:
try:
msgs = await self.sub.fetch(batch=1, timeout=60)
_f('info', f"{msgs}") if v else None
payload = msgs[0].data if generic else Payload(
**json.loads(msgs[0].data))
_f('info', f"{payload}") if v else None
try:
await cb(payload, msgs[0])
except Exception as e:
_f("warn", f'retrying connection to {self.magnet.config.host}\n{e}')
_f("info", "this can also be a problem with your callback")
if v:
_f('info', f"{msgs}")
payload = msgs[0].data if generic else Payload(**json.loads(msgs[0].data))
if v:
_f('info', f"{payload}")
await cb(payload, msgs[0])
except Exception as e:
if "nats: timeout" in str(e):
_f('warn', 'encountered a timeout, retrying in 1s')
else:
_f('fatal', str(e))
_f("warn", f'retrying connection to {self.magnet.config.host}\n{e}')
_f("info", "this can also be a problem with your callback")
await asyncio.sleep(1)
_f("info",
f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')
_f("info", f'consuming delta from [{self.magnet.config.category}] on\n🛰️ stream: {self.magnet.config.stream_name}\n🧲 session: "{self.magnet.config.session}"')


async def worker(self, cb=print):
"""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "llm_magnet"
version = "0.2.9"
version = "0.3.0"
description = "the small distributed language model toolkit. fine-tune state-of-the-art LLMs anywhere, rapidly."
readme = "dynamic"

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='llm_magnet',
version='0.2.9',
version='0.3.0',
description="the small distributed language model toolkit. fine-tune state-of-the-art LLMs anywhere, rapidly.",
long_description=open('README.md').read(),
long_description_content_type='text/markdown',
Expand Down

0 comments on commit b03e2ae

Please sign in to comment.