Skip to content

Commit

Permalink
🚧 fix(refactor): wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Mar 13, 2024
1 parent 26711f3 commit b365b45
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 21 deletions.
2 changes: 1 addition & 1 deletion magnet/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def align(self):
domain=self.config.domain
) if self.config.domain is not None else self.nc.jetstream()
if self.config.name:
self.nc = await self._setup_stream()
await self._setup_stream()
if self.config.kv_name:
self.kv = await self._setup_kv()
if self.config.os_name:
Expand Down
20 changes: 10 additions & 10 deletions magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async def reset(self, name=None):
_f('fatal', "name doesn't match the stream category or category doesn't exist")

class Resonator:
def __init__(self, prism: PrismConfig | dict = None):
def __init__(self, prism: Prism):
"""
Initializes the `Resonator` class with the NATS server address.
Expand Down Expand Up @@ -158,26 +158,26 @@ async def on(self, job: bool = None, local: bool = False, bandwidth: int = 1000)
)
_f('wait', f'connecting to {self.prism.config.host}')
try:
self.sub = await self.js.pull_subscribe(
durable=self.prism.session
self.sub = await self.prism.js.pull_subscribe(
durable=self.prism.config.session
, subject=self.prism.config.category
, stream=self.prism.config.name
, config=self.consumer_config
)
_f('info',
f'joined worker queue: {self.prism.session} as {self.node}')
f'joined worker queue: {self.prism.config.session} as {self.node}')
except Exception as e:
return _f('fatal', e)

async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbose=False):

try: self.prism.js.sub
try: self.sub
except: return _f('fatal', 'no subscriber initialized')
if job_n:
_f("info",
f'consuming {job_n} from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.name}\n🧲 session: "{self.prism.session}"')
try:
msgs = await self.prism.js.sub.fetch(batch=job_n, timeout=60)
msgs = await self.sub.fetch(batch=job_n, timeout=60)
payloads = [msg.data if generic else Payload(
**json.loads(msg.data)) for msg in msgs]
try:
Expand All @@ -189,12 +189,12 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo
_f('fatal', e)
except ValueError as e:
_f('warn',
f'{self.session} reached the end of {self.prism.config.category}, {self.prism.config.name}')
f'{self.prism.config.session} reached the end of {self.prism.config.category}, {self.prism.config.name}')
except Exception as e:
_f('fatal', e)
_f('warn', "no more data")
else:
_f("info",
f'consuming delta from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.name}\n🧲 session: "{self.prism.session}"')
f'consuming delta from [{self.prism.config.category}] on\n🛰️ stream: {self.prism.config.name}\n🧲 session: "{self.prism.config.session}"')
while True:
try:
msgs = await self.sub.fetch(batch=1, timeout=60)
Expand All @@ -208,7 +208,7 @@ async def listen(self, cb=print, job_n: int = None, generic: bool = False, verbo
_f("warn", f'retrying connection to {self.prism.config.host}\n{e}')
_f("info", "this can also be a problem with your callback")
except Exception as e:
_f('fatal', f'invalid JSON\n{e}')
_f('warn', f'no more data') if "nats: timeout" in str(e) else _f('fatal', e)
break

async def worker(self, cb=print):
Expand Down
16 changes: 8 additions & 8 deletions magnet/ize/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ async def on(self, create: bool = False, initialize: bool = False):
self.db.initialize()
self.db.load()

async def index(self, payload, msg, field: Charge = None, verbose: bool = False, instruction: str = "Represent this sentence for searching relevant passages: "):
async def index(self, payload, msg, field: Charge = None, v: bool = False, instruction: str = "Represent this sentence for searching relevant passages: "):
if not msg or not payload:
return _f('fatal', 'no field message and/or payload to ack!')
if field:
self.field = field
try:
_f('info', f'encoding payload\n{payload}') if verbose else None
_f('info', f'encoding payload\n{payload}') if v else None
payload.embedding = self._model.encode(
f"{instruction} {payload.text}", normalize_embeddings=True)
except Exception as e:
return _f('fatal', e)
await msg.in_progress()
try:
_f('info', f'indexing payload') if verbose else None
_f('info', f'indexing payload') if v else None
if not await self.is_dupe(q=payload.embedding):
self.db.collection.insert([
[payload.document], [payload.text], [payload.embedding]
Expand All @@ -62,13 +62,13 @@ async def index(self, payload, msg, field: Charge = None, verbose: bool = False,
text=payload.text,
document=payload.document
)
_f('info', f'sending payload\n{payload}') if verbose else None
_f('info', f'sending payload\n{payload}') if v else None
await self.field.pulse(payload)
await msg.ack_sync()
_f('success', f'embedding indexed\n{payload}') if verbose else None
_f('success', f'embedding indexed\n{payload}') if v else None
else:
await msg.ack_sync()
_f('warn', f'embedding exists already\n{payload}') if verbose else None
_f('warn', f'embedding exists already\n{payload}') if v else None
except Exception as e:
await msg.term()
_f('fatal', e)
Expand All @@ -85,7 +85,7 @@ def search(self, payload, limit: int = 100, cb: Optional[callable] = None, instr
_results = self.db.collection.search(
data=[payload.embedding],
anns_field="embedding",
param=self.config.index.params,
param=self.config.index.options,
limit=limit,
output_fields=['text', 'document']
)
Expand Down Expand Up @@ -121,7 +121,7 @@ async def is_dupe(self, q: str = None):
match = self.db.collection.search(
data=[q],
anns_field="embedding",
param=self.config.index.params,
param=self.config.index.options,
output_fields=['text', 'document'],
limit=1
)
Expand Down
3 changes: 1 addition & 2 deletions magnet/utils/milvus.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def on(self):
alias=self.config.session
)
self.schema = CollectionSchema(fields=self.fields)
self.index_params = self.config.index.options
_f('success', f"connected successfully to {self.config.index.milvus_uri}")
except Exception as e:
_f('fatal', e)
Expand All @@ -40,7 +39,7 @@ def create(self, overwrite=False):
utility.drop_collection(self.config.index.name, using=self.config.session)
try:
self.collection = Collection(name=self.config.index.name, schema=self.schema, using=self.config.session)
self.collection.create_index(field_name="embedding", index_params=self.index_params)
self.collection.create_index(field_name="embedding", index_params=self.config.index.options)
_f('success', f"{self.config.index.name} created")
except Exception as e:
_f('fatal', e)
Expand Down

0 comments on commit b365b45

Please sign in to comment.