diff --git a/magnet/ic/field.py b/magnet/ic/field.py index 767ac39..9ecbb31 100644 --- a/magnet/ic/field.py +++ b/magnet/ic/field.py @@ -143,21 +143,20 @@ def __init__(self, server: str): """ self.server = server - async def on(self, category: str = 'no_category', stream: str = 'documents', cb=print, session='magnet'): + async def on(self, category: str = 'no_category', stream: str = 'documents', session='magnet'): """ - Connects to a NATS server, subscribes to a specific category in a stream, and consumes messages from that category. - + Connects to the NATS server, subscribes to a specific category in a stream, and consumes messages from that category. + Args: category (str, optional): The category to subscribe to. Defaults to 'no_category'. stream (str, optional): The stream to subscribe to. Defaults to 'documents'. - cb (function, optional): The callback function to process the received messages. Defaults to `print`. session (str, optional): The session name for durable subscriptions. Defaults to 'magnet'. - + Returns: None - + Raises: - TimeoutError: If connection to the NATS server times out. + TimeoutError: If there is a timeout error while connecting to the NATS server. Exception: If there is an error in consuming the message or processing the callback function. """ self.category = category @@ -171,6 +170,19 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', cb= _f("success", f'connected to {self.server}') except TimeoutError: _f("fatal", f'could not connect to {self.server}') + async def listen(self, cb=print): + """ + Consume messages from a specific category in a stream. + + Args: + cb (function, optional): The callback function to process the received messages. Defaults to `print`. + + Returns: + None + + Raises: + Exception: If there is an error in consuming the message or processing the callback function. + """ _f("info", f'consuming delta from [{self.category}] on\n🛰️ stream: {self.stream}\n🧲 session: "{self.session}"') while True: try: