Skip to content

Commit

Permalink
🚧 fix(wip): check for duplicates in milvus
Browse files Browse the repository at this point in the history
  • Loading branch information
mxchinegod committed Jan 3, 2024
1 parent 16fc843 commit 883f6d3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 56 deletions.
52 changes: 5 additions & 47 deletions examples/distributed_embedding/1_callback_index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,9 @@
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[92m🌊 SUCCESS: connected successfully to 192.168.2.69\u001b[0m\n"
]
}
],
"outputs": [],
"source": [
"from magnet.ize import memory\n",
"config = {\n",
Expand All @@ -35,35 +27,9 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[96m☕️ WAIT: connecting to my-user:[email protected]\u001b[0m\n",
"\u001b[92m🌊 SUCCESS: connected to my-user:[email protected]\u001b[0m\n",
"\u001b[94mℹ️ INFO: consuming delta from [nlp_chunks_deduped] on\n",
"🛰️ stream: documents\n",
"🧲 session: \"bge_nlp_deduped_embedder\"\u001b[0m\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"RPC error: [batch_insert], <ParamError: (code=1, message=invalid input, length of string exceeds max length. length: 94908, max length: 65535)>, <Time:{'RPC start': '2023-12-23 17:44:57.461964', 'RPC error': '2023-12-23 17:44:57.462059'}>\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[91m☠️ FATAL: <ParamError: (code=1, message=invalid input, length of string exceeds max length. length: 94908, max length: 65535)>\u001b[0m\n"
]
}
],
"outputs": [],
"source": [
"from magnet.ic import field\n",
"reso = field.Resonator(\"my-user:[email protected]\")\n",
Expand All @@ -77,15 +43,7 @@
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[93m🚨 WARN: bge_nlp_deduped deleted\u001b[0m\n"
]
}
],
"outputs": [],
"source": [
"# index.delete(name=\"bge_nlp_deduped\")"
]
Expand Down
26 changes: 17 additions & 9 deletions magnet/ize/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ async def embed_and_store(self, payload, verbose=False, field=None):
else:
try:
_f('info', 'storing payload') if verbose else None
self.db.collection.insert([
[payload.document], [payload.text], [payload.embedding]
])
self.db.collection.flush(collection_name_array=[
self.config['INDEX']])
if not self.is_dupe(q=payload.embedding):
self.db.collection.insert([
[payload.document], [payload.text], [payload.embedding]
])
self.db.collection.flush(collection_name_array=[
self.config['INDEX']])
except Exception as e:
_f('fatal', e)

Expand Down Expand Up @@ -96,17 +97,15 @@ async def embed_and_charge(self, payload, verbose=False, field=None):
else:
raise ValueError('field is required')
try:
if verbose:
_f('info', 'embedding payload')
_f('info', 'embedding payload') if verbose else None
payload = EmbeddingPayload(
model=self.config['MODEL'],
embedding=self.model.encode(
f"Represent this sentence for searching relevant passages: {payload.text}", normalize_embeddings=True).tolist(),
text=payload.text,
document=payload.document
)
if verbose:
_f('info', f'sending payload\n{payload}')
_f('info', f'sending payload\n{payload}') if verbose else None
await self.field.pulse(payload)
except Exception as e:
_f('fatal', e)
Expand Down Expand Up @@ -185,3 +184,12 @@ def delete(self, name=None):
_f('fatal', e)
else:
_f('fatal', "name doesn't match the connection or the connection doesn't exist")
def is_dupe(self, q):
match = self.collection.search(
data=[q]
, anns_field = "embeddings"
, param=self.config['INDEX_PARAMS']
, output_fields=['text', 'id', 'documentId']
, limit=1
)
return True if sum(match[0].distances) == 0.0 else False

0 comments on commit 883f6d3

Please sign in to comment.