Skip to content

Commit

Permalink
Merge pull request #22 from Prismadic/21-fixworkflow-separate-fieldre…
Browse files Browse the repository at this point in the history
…sonators-processing-logic-from-on

🚧 fix(perf): put resonator processing in a function called 'listen'
  • Loading branch information
mxchinegod authored Jan 10, 2024
2 parents fdcfee5 + ee2eeb6 commit d362919
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,20 @@ def __init__(self, server: str):
"""
self.server = server

async def on(self, category: str = 'no_category', stream: str = 'documents', cb=print, session='magnet'):
async def on(self, category: str = 'no_category', stream: str = 'documents', session='magnet'):
"""
Connects to a NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
Args:
category (str, optional): The category to subscribe to. Defaults to 'no_category'.
stream (str, optional): The stream to subscribe to. Defaults to 'documents'.
cb (function, optional): The callback function to process the received messages. Defaults to `print`.
session (str, optional): The session name for durable subscriptions. Defaults to 'magnet'.
Returns:
None
Raises:
TimeoutError: If connection to the NATS server times out.
TimeoutError: If there is a timeout error while connecting to the NATS server.
Exception: If there is an error in consuming the message or processing the callback function.
"""
self.category = category
Expand All @@ -171,6 +170,19 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', cb=
_f("success", f'connected to {self.server}')
except TimeoutError:
_f("fatal", f'could not connect to {self.server}')
async def listen(self, cb=print):
"""
Consume messages from a specific category in a stream.
Args:
cb (function, optional): The callback function to process the received messages. Defaults to `print`.
Returns:
None
Raises:
Exception: If there is an error in consuming the message or processing the callback function.
"""
_f("info", f'consuming delta from [{self.category}] on\n🛰️ stream: {self.stream}\n🧲 session: "{self.session}"')
while True:
try:
Expand Down

0 comments on commit d362919

Please sign in to comment.