Skip to content

Commit

Permalink
🚧 fix(wip): believe this is parameterized to handle multiple payloads…
Browse files Browse the repository at this point in the history
… as a callback argument
  • Loading branch information
mxchinegod committed Jan 20, 2024
1 parent 990c2b0 commit 883d549
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
2 changes: 1 addition & 1 deletion magnet/electrode.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def auto(self):
self.reso = field.Resonator(f"{self.config['NATS_USER']}:{self.config['NATS_PASSWORD']}@{self.config['NATS_URL']}")
self.embedder = memory.Embedder(self.config, create=self.config["CREATE"])
await self.reso.on(category=self.config['NATS_CATEGORY'], session=self.config['NATS_SESSION'], stream=self.config['NATS_STREAM'])
await self.reso.listen(cb=self.embedder.index, job_range=self.config['JOB_RANGE']) # this will not work yet due to being a list of items, not one at a time
await self.reso.listen(cb=self.embedder.index, job_range=self.config['JOB_RANGE'])



3 changes: 2 additions & 1 deletion magnet/ic/field.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ async def listen(self, cb=print, job_range: tuple = None):
msgs = self.sub.fetch(job_range[1])[job_range[0], job_range[1]]
payloads = [Payload(**json.loads(msg.data)) for msg in msgs]
try:
await cb(payloads, msgs)
for payload, msg in payloads, msgs:
await cb(payload, msg)
except Exception as e:
_f('fatal', e)
except Exception as e:
Expand Down

0 comments on commit 883d549

Please sign in to comment.