From 883f6d3a83a8173df574a442fb6042279202c579 Mon Sep 17 00:00:00 2001 From: DylanAlloy Date: Wed, 3 Jan 2024 11:16:21 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20fix(wip):=20check=20for=20duplic?= =?UTF-8?q?ates=20in=20milvus?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../1_callback_index.ipynb | 52 ++----------------- magnet/ize/memory.py | 26 ++++++---- 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/examples/distributed_embedding/1_callback_index.ipynb b/examples/distributed_embedding/1_callback_index.ipynb index 3dec627..416e03b 100644 --- a/examples/distributed_embedding/1_callback_index.ipynb +++ b/examples/distributed_embedding/1_callback_index.ipynb @@ -2,17 +2,9 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[92m🌊 SUCCESS: connected successfully to 192.168.2.69\u001b[0m\n" - ] - } - ], + "outputs": [], "source": [ "from magnet.ize import memory\n", "config = {\n", @@ -35,35 +27,9 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[96m☕️ WAIT: connecting to my-user:T0pS3cr3t@192.168.2.69\u001b[0m\n", - "\u001b[92m🌊 SUCCESS: connected to my-user:T0pS3cr3t@192.168.2.69\u001b[0m\n", - "\u001b[94mℹ️ INFO: consuming delta from [nlp_chunks_deduped] on\n", - "🛰️ stream: documents\n", - "🧲 session: \"bge_nlp_deduped_embedder\"\u001b[0m\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "RPC error: [batch_insert], , \n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[91m☠️ FATAL: \u001b[0m\n" - ] - } - ], + "outputs": [], "source": [ "from magnet.ic import field\n", "reso = field.Resonator(\"my-user:T0pS3cr3t@192.168.2.69\")\n", @@ -77,15 +43,7 @@ "cell_type": "code", "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\u001b[93m🚨 WARN: bge_nlp_deduped deleted\u001b[0m\n" - ] - } - ], + "outputs": [], "source": [ "# index.delete(name=\"bge_nlp_deduped\")" ] diff --git a/magnet/ize/memory.py b/magnet/ize/memory.py index 954f76f..a0f0f56 100644 --- a/magnet/ize/memory.py +++ b/magnet/ize/memory.py @@ -58,11 +58,12 @@ async def embed_and_store(self, payload, verbose=False, field=None): else: try: _f('info', 'storing payload') if verbose else None - self.db.collection.insert([ - [payload.document], [payload.text], [payload.embedding] - ]) - self.db.collection.flush(collection_name_array=[ - self.config['INDEX']]) + if not self.is_dupe(q=payload.embedding): + self.db.collection.insert([ + [payload.document], [payload.text], [payload.embedding] + ]) + self.db.collection.flush(collection_name_array=[ + self.config['INDEX']]) except Exception as e: _f('fatal', e) @@ -96,8 +97,7 @@ async def embed_and_charge(self, payload, verbose=False, field=None): else: raise ValueError('field is required') try: - if verbose: - _f('info', 'embedding payload') + _f('info', 'embedding payload') if verbose else None payload = EmbeddingPayload( model=self.config['MODEL'], embedding=self.model.encode( @@ -105,8 +105,7 @@ async def embed_and_charge(self, payload, verbose=False, field=None): text=payload.text, document=payload.document ) - if verbose: - _f('info', f'sending payload\n{payload}') + _f('info', f'sending payload\n{payload}') if verbose else None await self.field.pulse(payload) except Exception as e: _f('fatal', e) @@ -185,3 +184,12 @@ def delete(self, name=None): _f('fatal', e) else: _f('fatal', "name doesn't match the connection or the connection doesn't exist") + def is_dupe(self, q): + match = self.collection.search( + data=[q] + , anns_field = "embeddings" + , param=self.config['INDEX_PARAMS'] + , output_fields=['text', 'id', 'documentId'] + , limit=1 + ) + return True if sum(match[0].distances) == 0.0 else False \ No newline at end of file