Skip to content

Commit

Permalink
📖 feat(docs): magnet.ic
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Dec 23, 2023
1 parent 5e76770 commit b082303
Showing 1 changed file with 87 additions and 3 deletions.
90 changes: 87 additions & 3 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,54 @@ async def reset(self, name=None):
_f('fatal', "name doesn't match the stream category or category doesn't exist")

class Resonator:
def __init__(self, server):
"""
The `Resonator` class is responsible for connecting to a NATS server, subscribing to a specific category in a stream, and consuming messages from that category.
Args:
server (str): The address of the NATS server.
Attributes:
server (str): The address of the NATS server.
category (str): The category to subscribe to.
stream (str): The stream to subscribe to.
session (str): The session name for durable subscriptions.
nc (nats.aio.client.Client): The NATS client.
js (nats.aio.client.JetStream): The JetStream client.
sub (nats.aio.client.Subscription): The subscription object.
Methods:
__init__(self, server: str)
on(self, category: str = 'no_category', stream: str = 'documents', cb=print, session='magnet') -> None
info(self, session: str = None) -> None
off(self) -> None
"""

def __init__(self, server: str):
"""
Initializes the `Resonator` object with the server address.
Args:
server (str): The address of the NATS server.
"""
self.server = server

async def on(self, category: str = 'no_category', stream: str = 'documents', cb=print, session='magnet'):
async def on(self, category: str = 'no_category', stream: str = 'documents', cb=print, session='magnet') -> None:
"""
Connects to a NATS server, subscribes to a specific category in a stream, and consumes messages from that category.
Args:
category (str, optional): The category to subscribe to. Defaults to 'no_category'.
stream (str, optional): The stream to subscribe to. Defaults to 'documents'.
cb (function, optional): The callback function to be called for each received message. Defaults to `print`.
session (str, optional): The session name for durable subscriptions. Defaults to 'magnet'.
Returns:
None
Raises:
TimeoutError: If connection to the NATS server times out.
Exception: If there is an error while waiting for the next message or if the received JSON is invalid.
"""
self.category = category
self.stream = stream
self.session = session
Expand All @@ -130,6 +174,35 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', cb=
_f("info", f'consuming delta from [{self.category}] on\n🛰️ stream: {self.stream}\n🧲 session: "{self.session}"')
while True:
try:
msg = await self.sub.next_message(timeout=1)
if msg:
await cb(msg.data)
except Exception as e:
_f("error", f'error while waiting for next message: {e}')

async def info(self, session: str = None) -> None:
"""
Retrieves information about a consumer in a JetStream stream.
Args:
session (str, optional): The session name. Defaults to None.
Returns:
None
"""
session = session or self.session
info = await self.js.consumer_info(self.category, self.stream, session=session)
_f("info", f'consumer info:\n{info}')

def off(self) -> None:
"""
Unsubscribes from the NATS server and disconnects from it.
Returns:
None
"""
self.sub.unsubscribe()
self.nc.close()
msg = await self.sub.next_msg(timeout=60)
payload = Payload(**json.loads(msg.data))
try:
Expand All @@ -139,9 +212,20 @@ async def on(self, category: str = 'no_category', stream: str = 'documents', cb=
except Exception as e:
_f('fatal','invalid JSON')
async def info(self, session: str = None):
"""
Retrieves information about a consumer in a JetStream stream.
:param session: The session name of the consumer for which to retrieve information.
:return: A JSON string representation of the consumer configuration.
"""
jsm = await self.js.consumer_info(stream=self.stream, consumer=session)
_f('info',json.dumps(jsm.config.__dict__, indent=2))
_f('info', json.dumps(jsm.config.__dict__, indent=2))
async def off(self):
"""
Unsubscribes from the NATS server and disconnects from it.
:return: None
"""
await self.sub.unsubscribe()
_f('warn', f'unsubscribed from {self.stream}')
await self.nc.drain()
Expand Down

0 comments on commit b082303

Please sign in to comment.