From da7e0cb876f93bb9a00a469667739ab85436455c Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:33:17 -0800 Subject: [PATCH 1/8] fix: resolve missing batch processing UI feedback message --- app/src/chainlit.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/app/src/chainlit.py b/app/src/chainlit.py index dda635ea..4551adf1 100644 --- a/app/src/chainlit.py +++ b/app/src/chainlit.py @@ -255,19 +255,32 @@ async def _batch_proccessing(file: AskFileResponse) -> None: try: engine: chat_engine.ChatEngineInterface = cl.user_session.get("chat_engine") + result_file_path = await batch_process(file.path, engine) # E.g., "abcd.csv" to "abcd_results.csv" result_file_name = file.name.removesuffix(".csv") + "_results.csv" + elements = [cl.File(name=result_file_name, path=result_file_path)] + await cl.Message( - content="File processed, results attached.", - elements=[cl.File(name=result_file_name, path=result_file_path)], + author="backend", + content=f"File processed, results attached.", + metadata={ + "status": "processing_complete", + "original_file": file.name, + "result_file": result_file_name + }, + elements=elements ).send() except ValueError as err: await cl.Message( author="backend", - metadata={"error_class": err.__class__.__name__, "error": str(err)}, - content=f"{err.__class__.__name__}: {err}", + content=f"Error processing file: {err}", + metadata={ + "status": "processing_error", + "error_class": err.__class__.__name__, + "error": str(err) + }, ).send() From 51061e58dfd419baf69ed9d310014bac61160415 Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Thu, 9 Jan 2025 13:41:22 -0800 Subject: [PATCH 2/8] fix: lint errors --- app/src/chainlit.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/app/src/chainlit.py b/app/src/chainlit.py index 4551adf1..d2d42782 100644 --- a/app/src/chainlit.py +++ b/app/src/chainlit.py @@ -255,17 +255,15 @@ async def _batch_proccessing(file: AskFileResponse) -> None: try: engine: chat_engine.ChatEngineInterface = cl.user_session.get("chat_engine") - result_file_path = await batch_process(file.path, engine) # E.g., "abcd.csv" to "abcd_results.csv" result_file_name = file.name.removesuffix(".csv") + "_results.csv" elements = [cl.File(name=result_file_name, path=result_file_path)] - await cl.Message( author="backend", - content=f"File processed, results attached.", + content="File processed, results attached.", metadata={ "status": "processing_complete", "original_file": file.name, @@ -280,7 +278,7 @@ async def _batch_proccessing(file: AskFileResponse) -> None: content=f"Error processing file: {err}", metadata={ "status": "processing_error", - "error_class": err.__class__.__name__, + "error_class": err.__class__.__name__, "error": str(err) - }, + } ).send() From 793b80862010f23d6ebdf31bc707e6cd8c50a87e Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Fri, 10 Jan 2025 06:47:00 -0800 Subject: [PATCH 3/8] style: fix linting issues in chainlit.py --- app/src/chainlit.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/src/chainlit.py b/app/src/chainlit.py index d2d42782..dcd9e19c 100644 --- a/app/src/chainlit.py +++ b/app/src/chainlit.py @@ -260,7 +260,6 @@ async def _batch_proccessing(file: AskFileResponse) -> None: # E.g., "abcd.csv" to "abcd_results.csv" result_file_name = file.name.removesuffix(".csv") + "_results.csv" - elements = [cl.File(name=result_file_name, path=result_file_path)] await cl.Message( author="backend", content="File processed, results attached.", @@ -269,7 +268,7 @@ async def _batch_proccessing(file: AskFileResponse) -> None: "original_file": file.name, "result_file": result_file_name }, - elements=elements + elements=[cl.File(name=result_file_name, path=result_file_path)] ).send() except ValueError as err: From 5ccfc16bf64e78d66f4d1e484c2210ff84730dfe Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Fri, 10 Jan 2025 08:23:14 -0800 Subject: [PATCH 4/8] fix: add message updates to resolve hanging requests during polling --- app/src/batch_process.py | 18 +++++++++++++++++- app/src/chainlit.py | 16 ---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/app/src/batch_process.py b/app/src/batch_process.py index 9853afeb..0eaa5dc3 100644 --- a/app/src/batch_process.py +++ b/app/src/batch_process.py @@ -14,11 +14,27 @@ async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: rows = list(reader) # Convert reader to list to preserve order questions = [row["question"] for row in rows] + total_questions = len(questions) # Process questions sequentially to avoid thread-safety issues with LiteLLM # Previous parallel implementation caused high CPU usage due to potential thread-safety # concerns in the underlying LLM client libraries - processed_data = [_process_question(q, engine) for q in questions] + processed_data = [] + + # Create a progress message that we'll update + import chainlit as cl + progress_msg = cl.Message(content="Received file, starting batch processing...") + await progress_msg.send() + + for i, q in enumerate(questions, 1): + # Update progress message + progress_msg.content = f"Processing question {i} of {total_questions}..." + await progress_msg.update() + + processed_data.append(_process_question(q, engine)) + + # Clean up progress message + await progress_msg.remove() # Update rows with processed data while preserving original order for row, data in zip(rows, processed_data, strict=True): diff --git a/app/src/chainlit.py b/app/src/chainlit.py index dcd9e19c..c6a94df6 100644 --- a/app/src/chainlit.py +++ b/app/src/chainlit.py @@ -248,26 +248,15 @@ def _get_retrieval_metadata(result: OnMessageResult) -> dict: async def _batch_proccessing(file: AskFileResponse) -> None: - await cl.Message( - author="backend", - content="Received file, processing...", - ).send() - try: engine: chat_engine.ChatEngineInterface = cl.user_session.get("chat_engine") result_file_path = await batch_process(file.path, engine) # E.g., "abcd.csv" to "abcd_results.csv" result_file_name = file.name.removesuffix(".csv") + "_results.csv" - await cl.Message( author="backend", content="File processed, results attached.", - metadata={ - "status": "processing_complete", - "original_file": file.name, - "result_file": result_file_name - }, elements=[cl.File(name=result_file_name, path=result_file_path)] ).send() @@ -275,9 +264,4 @@ async def _batch_proccessing(file: AskFileResponse) -> None: await cl.Message( author="backend", content=f"Error processing file: {err}", - metadata={ - "status": "processing_error", - "error_class": err.__class__.__name__, - "error": str(err) - } ).send() From 0b648fc7ee1403e9a835768d8051eaf4bdc0c2fb Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Fri, 10 Jan 2025 08:24:44 -0800 Subject: [PATCH 5/8] fix: lint errors --- app/src/batch_process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/batch_process.py b/app/src/batch_process.py index 0eaa5dc3..3eff52a3 100644 --- a/app/src/batch_process.py +++ b/app/src/batch_process.py @@ -20,7 +20,7 @@ async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: # Previous parallel implementation caused high CPU usage due to potential thread-safety # concerns in the underlying LLM client libraries processed_data = [] - + # Create a progress message that we'll update import chainlit as cl progress_msg = cl.Message(content="Received file, starting batch processing...") @@ -30,7 +30,7 @@ async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: # Update progress message progress_msg.content = f"Processing question {i} of {total_questions}..." await progress_msg.update() - + processed_data.append(_process_question(q, engine)) # Clean up progress message From d234acdb408dba73ddf88c285d6b4191c092dae3 Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Fri, 10 Jan 2025 08:53:58 -0800 Subject: [PATCH 6/8] test: adding mock classes for Chainlit message in test_batch_process.py --- app/src/batch_process.py | 3 +-- app/src/chainlit.py | 2 +- app/tests/src/test_batch_process.py | 19 ++++++++++++++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/app/src/batch_process.py b/app/src/batch_process.py index 3eff52a3..c0b185fa 100644 --- a/app/src/batch_process.py +++ b/app/src/batch_process.py @@ -1,6 +1,7 @@ import csv import tempfile +import chainlit as cl from src.chat_engine import ChatEngineInterface from src.citations import simplify_citation_numbers @@ -21,8 +22,6 @@ async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: # concerns in the underlying LLM client libraries processed_data = [] - # Create a progress message that we'll update - import chainlit as cl progress_msg = cl.Message(content="Received file, starting batch processing...") await progress_msg.send() diff --git a/app/src/chainlit.py b/app/src/chainlit.py index c6a94df6..02f231ec 100644 --- a/app/src/chainlit.py +++ b/app/src/chainlit.py @@ -257,7 +257,7 @@ async def _batch_proccessing(file: AskFileResponse) -> None: await cl.Message( author="backend", content="File processed, results attached.", - elements=[cl.File(name=result_file_name, path=result_file_path)] + elements=[cl.File(name=result_file_name, path=result_file_path)], ).send() except ValueError as err: diff --git a/app/tests/src/test_batch_process.py b/app/tests/src/test_batch_process.py index ae27fc08..5ea41ade 100644 --- a/app/tests/src/test_batch_process.py +++ b/app/tests/src/test_batch_process.py @@ -1,4 +1,5 @@ import pytest +from unittest.mock import AsyncMock, MagicMock from src import chat_engine from src.batch_process import _process_question, batch_process @@ -30,6 +31,22 @@ def invalid_csv(tmp_path): return str(csv_path) +@pytest.fixture +def mock_chainlit_message(monkeypatch): + mock_message = MagicMock() + mock_message.send = AsyncMock() + mock_message.update = AsyncMock() + mock_message.remove = AsyncMock() + + class MockMessage: + def __init__(self, content): + self.content = content + for attr, value in mock_message.__dict__.items(): + setattr(self, attr, value) + + monkeypatch.setattr("chainlit.Message", MockMessage) + + @pytest.mark.asyncio async def test_batch_process_invalid(invalid_csv, engine): engine = chat_engine.create_engine("ca-edd-web") @@ -38,7 +55,7 @@ async def test_batch_process_invalid(invalid_csv, engine): @pytest.mark.asyncio -async def test_batch_process(monkeypatch, sample_csv, engine): +async def test_batch_process(monkeypatch, sample_csv, engine, mock_chainlit_message): def mock__process_question(question, engine): if question == "What is AI?": return {"answer": "Answer to What is AI?", "field_2": "value_2"} From e08d467b6273cec93c814c0130e8a1acea3e716c Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Fri, 10 Jan 2025 08:55:37 -0800 Subject: [PATCH 7/8] fix: lint errors --- app/tests/src/test_batch_process.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/app/tests/src/test_batch_process.py b/app/tests/src/test_batch_process.py index 5ea41ade..19119cdb 100644 --- a/app/tests/src/test_batch_process.py +++ b/app/tests/src/test_batch_process.py @@ -1,6 +1,7 @@ -import pytest from unittest.mock import AsyncMock, MagicMock +import pytest + from src import chat_engine from src.batch_process import _process_question, batch_process from src.chat_engine import OnMessageResult @@ -37,13 +38,13 @@ def mock_chainlit_message(monkeypatch): mock_message.send = AsyncMock() mock_message.update = AsyncMock() mock_message.remove = AsyncMock() - + class MockMessage: def __init__(self, content): self.content = content for attr, value in mock_message.__dict__.items(): setattr(self, attr, value) - + monkeypatch.setattr("chainlit.Message", MockMessage) From b3c3886f231fac440260a463b3255f07612c80a9 Mon Sep 17 00:00:00 2001 From: fg-nava <189638926+fg-nava@users.noreply.github.com> Date: Fri, 10 Jan 2025 10:14:10 -0800 Subject: [PATCH 8/8] feat: add logging to chainlit.py and batch_process.py --- app/src/batch_process.py | 10 ++++++++++ app/src/chainlit.py | 2 ++ 2 files changed, 12 insertions(+) diff --git a/app/src/batch_process.py b/app/src/batch_process.py index c0b185fa..41882d6b 100644 --- a/app/src/batch_process.py +++ b/app/src/batch_process.py @@ -1,21 +1,27 @@ import csv +import logging import tempfile import chainlit as cl from src.chat_engine import ChatEngineInterface from src.citations import simplify_citation_numbers +logger = logging.getLogger(__name__) + async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: + logger.info("Starting batch processing of file: %r", file_path) with open(file_path, mode="r", newline="", encoding="utf-8") as csvfile: reader = csv.DictReader(csvfile) if not reader.fieldnames or "question" not in reader.fieldnames: + logger.error("Invalid CSV format: missing 'question' column in %r", file_path) raise ValueError("CSV file must contain a 'question' column.") rows = list(reader) # Convert reader to list to preserve order questions = [row["question"] for row in rows] total_questions = len(questions) + logger.info("Found %d questions to process", total_questions) # Process questions sequentially to avoid thread-safety issues with LiteLLM # Previous parallel implementation caused high CPU usage due to potential thread-safety @@ -29,6 +35,7 @@ async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: # Update progress message progress_msg.content = f"Processing question {i} of {total_questions}..." await progress_msg.update() + logger.info("Processing question %d/%d", i, total_questions) processed_data.append(_process_question(q, engine)) @@ -52,10 +59,12 @@ async def batch_process(file_path: str, engine: ChatEngineInterface) -> str: writer.writerows(rows) result_file.close() + logger.info("Batch processing complete. Results written to: %r", result_file.name) return result_file.name def _process_question(question: str, engine: ChatEngineInterface) -> dict[str, str | None]: + logger.debug("Processing question: %r", question) result = engine.on_message(question=question, chat_history=[]) final_result = simplify_citation_numbers(result) @@ -73,4 +82,5 @@ def _process_question(question: str, engine: ChatEngineInterface) -> dict[str, s citation_key + "_text": subsection.text, } + logger.debug("Question processed with %d citations", len(final_result.subsections)) return result_table diff --git a/app/src/chainlit.py b/app/src/chainlit.py index 02f231ec..c5b2a45d 100644 --- a/app/src/chainlit.py +++ b/app/src/chainlit.py @@ -213,6 +213,7 @@ async def on_message(message: cl.Message) -> None: metadata=_get_retrieval_metadata(result), ).send() except Exception as err: # pylint: disable=broad-exception-caught + logger.exception("Error processing message: %r", message.content) await cl.Message( author="backend", metadata={"error_class": err.__class__.__name__, "error": str(err)}, @@ -261,6 +262,7 @@ async def _batch_proccessing(file: AskFileResponse) -> None: ).send() except ValueError as err: + logger.error("Error processing file %r: %s", file.name, err) await cl.Message( author="backend", content=f"Error processing file: {err}",