From 750314fb0e94f12cb062541c54e06bea5e1a0548 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 10:39:37 +0400 Subject: [PATCH 01/14] feat: install kafka & redis --- poetry.lock | 34 +++++++++++++++++++++++++++++++++- pyproject.toml | 3 +++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 6889e530..98d10fd6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1800,6 +1800,20 @@ traitlets = ">=5.3" docs = ["myst-parser", "pydata-sphinx-theme", "sphinx-autodoc-typehints", "sphinxcontrib-github-alt", "sphinxcontrib-spelling", "traitlets"] test = ["ipykernel", "pre-commit", "pytest", "pytest-cov", "pytest-timeout"] +[[package]] +name = "kafka-python" +version = "2.0.2" +description = "Pure Python client for Apache Kafka" +optional = false +python-versions = "*" +files = [ + {file = "kafka-python-2.0.2.tar.gz", hash = "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3"}, + {file = "kafka_python-2.0.2-py2.py3-none-any.whl", hash = "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"}, +] + +[package.extras] +crc32c = ["crc32c"] + [[package]] name = "langdetect" version = "1.0.9" @@ -3166,6 +3180,24 @@ files = [ [package.extras] full = ["numpy"] +[[package]] +name = "redis" +version = "5.0.2" +description = "Python client for Redis database and key-value store" +optional = false +python-versions = ">=3.7" +files = [ + {file = "redis-5.0.2-py3-none-any.whl", hash = "sha256:4caa8e1fcb6f3c0ef28dba99535101d80934b7d4cd541bbb47f4a3826ee472d1"}, + {file = "redis-5.0.2.tar.gz", hash = "sha256:3f82cc80d350e93042c8e6e7a5d0596e4dd68715babffba79492733e1f367037"}, +] + +[package.dependencies] +async-timeout = ">=4.0.3" + +[package.extras] +hiredis = ["hiredis (>=1.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"] + [[package]] name = "regex" version = "2023.12.25" @@ -4258,4 +4290,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "b9774decb9338d39235bb4599347540fa5a684cecd89d194a95b26257be43364" +content-hash = "0bc57d0e29028de68b822188a12711cf8ee6b2cd6ead23c34598dd948b13cb06" diff --git a/pyproject.toml b/pyproject.toml index 7263d176..d8b99692 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,9 @@ gunicorn = "^21.2.0" unstructured-client = "^0.18.0" unstructured = {extras = ["google-drive"], version = "^0.12.4"} tiktoken = "^0.6.0" +kafka-python = "^2.0.2" +pydantic = "^2.6.3" +redis = "^5.0.2" [tool.poetry.group.dev.dependencies] termcolor = "^2.4.0" From da2baeef4830b76fa7485165e696a4f7291ad85b Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 10:41:35 +0400 Subject: [PATCH 02/14] feat: add docker compose --- .env.example | 8 +++++ docker-compose.yml | 80 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 docker-compose.yml diff --git a/.env.example b/.env.example index 31ff8a25..a4f2a8ae 100644 --- a/.env.example +++ b/.env.example @@ -19,3 +19,11 @@ PINECONE_INDEX= # Unstructured API UNSTRUCTURED_IO_API_KEY= UNSTRUCTURED_IO_SERVER_URL= + +# Redis +REDIS_HOST=localhost +REDIS_PORT=6379 + +# Kafka +KAFKA_TOPIC_INGESTION=ingestion +KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093) \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..71d0569d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,80 @@ +version: "3" +services: + kafka1: + image: confluentinc/cp-kafka + container_name: kafka1 + hostname: kafka1 + ports: + - "9092:9092" + environment: + KAFKA_NODE_ID: 1 + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT" + KAFKA_LISTENERS: "INTERNAL://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://0.0.0.0:9092" + KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka1:29092,EXTERNAL://localhost:9092" + KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:29093" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + CLUSTER_ID: "vGAyBqiIjJIyN9Tp5B3aVQ==" + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + + healthcheck: + test: nc -z localhost 9092 || exit 1 + interval: 10s + timeout: 5s + retries: 15 + + # schema-registry0: + # image: confluentinc/cp-schema-registry + # container_name: schema-registry0 + # hostname: schema-registry0 + # ports: + # - "8081:8081" + # environment: + # SCHEMA_REGISTRY_HOST_NAME: schema-registry0 + # SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka1:29092" + # SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081" + # depends_on: + # - kafka1 + + kafka-ui: + image: provectuslabs/kafka-ui + container_name: kafka-ui + ports: + - "3001:8080" + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092 + + # KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + # KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8081 + + depends_on: + - kafka1 + # kafka-init-topics: + # image: confluentinc/cp-kafka:7.2.1 + # volumes: + # - ./data/message.json:/data/message.json + # depends_on: + # - kafka1 + # command: "bash -c 'echo Waiting for Kafka to be ready... && \ + # cub kafka-ready -b kafka1:29092 1 30 && \ + # kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ + # kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \ + # kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \ + # kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'" + redis: + image: redis:latest + container_name: redis + ports: + - "6379:6379" + networks: + - my-network + +networks: + my-network: + name: my-network + external: true From ff8f99a892ed2c3d15e2511fb59cb8ea81024bd4 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 10:41:59 +0400 Subject: [PATCH 03/14] feat: add kafka consumer & producer --- service/kafka/consumer.py | 16 ++++++++++++++++ service/kafka/producer.py | 4 ++++ 2 files changed, 20 insertions(+) create mode 100644 service/kafka/consumer.py create mode 100644 service/kafka/producer.py diff --git a/service/kafka/consumer.py b/service/kafka/consumer.py new file mode 100644 index 00000000..29c36265 --- /dev/null +++ b/service/kafka/consumer.py @@ -0,0 +1,16 @@ +from kafka import KafkaConsumer +from service.kafka.config import kafka_bootstrap_servers +import json + + +def get_kafka_consumer(topic: str): + consumer = KafkaConsumer( + topic, + bootstrap_servers=kafka_bootstrap_servers, + auto_offset_reset="earliest", + enable_auto_commit=False, + value_deserializer=lambda m: json.loads(m.decode("ascii")), + group_id="my-group", + ) + + return consumer diff --git a/service/kafka/producer.py b/service/kafka/producer.py new file mode 100644 index 00000000..2d2aa692 --- /dev/null +++ b/service/kafka/producer.py @@ -0,0 +1,4 @@ +from service.kafka.config import kafka_bootstrap_servers +from kafka import KafkaProducer + +kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers) From 717f5bb4aab2fbb7fecbbaa63cb1b83061c7d6af Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 10:42:15 +0400 Subject: [PATCH 04/14] feat: add redis client --- service/redis/client.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 service/redis/client.py diff --git a/service/redis/client.py b/service/redis/client.py new file mode 100644 index 00000000..d4ab2143 --- /dev/null +++ b/service/redis/client.py @@ -0,0 +1,6 @@ +from redis import Redis +from decouple import config + +redis_client = Redis( + host=config("REDIS_HOST", "localhost"), port=config("REDIS_PORT", 6379) +) From 5f243d12e53ea0e1d57961b461b47c4429c7d950 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 10:42:57 +0400 Subject: [PATCH 05/14] feat: pushing ingestion tasks to kafka --- api/ingest.py | 144 +++++++++++++++++++-------- models/api.py | 6 ++ models/ingest.py | 12 +++ service/kafka/config.py | 9 ++ service/kafka/consume.py | 41 ++++++++ service/redis/ingest_task_manager.py | 30 ++++++ 6 files changed, 203 insertions(+), 39 deletions(-) create mode 100644 models/api.py create mode 100644 service/kafka/config.py create mode 100644 service/kafka/consume.py create mode 100644 service/redis/ingest_task_manager.py diff --git a/api/ingest.py b/api/ingest.py index 7bde0d04..da394c04 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -4,59 +4,125 @@ import aiohttp from fastapi import APIRouter -from models.ingest import RequestPayload +from models.ingest import RequestPayload, TaskStatus +from models.api import ApiError from service.embedding import EmbeddingService from service.ingest import handle_google_drive, handle_urls from utils.summarise import SUMMARY_SUFFIX +from service.redis.ingest_task_manager import ( + IngestTaskManager, + CreateTaskDto, + UpdateTaskDto, +) +from service.redis.client import redis_client + +from fastapi.responses import JSONResponse +from fastapi import status +from service.kafka.config import kafka_bootstrap_servers, ingest_topic +from service.kafka.producer import kafka_producer + router = APIRouter() +class IngestPayload(RequestPayload): + task_id: str + + @router.post("/ingest") -async def ingest(payload: RequestPayload) -> Dict: - encoder = payload.document_processor.encoder.get_encoder() - embedding_service = EmbeddingService( - encoder=encoder, - index_name=payload.index_name, - vector_credentials=payload.vector_database, - dimensions=payload.document_processor.encoder.dimensions, +async def add_ingest_queue(payload: RequestPayload): + try: + task_manager = IngestTaskManager(redis_client) + task_id = task_manager.create(CreateTaskDto(status=TaskStatus.PENDING)) + print("Task ID: ", task_id) + + message = IngestPayload(**payload.model_dump(), task_id=str(task_id)) + + msg = message.model_dump_json().encode() + + kafka_producer.send(ingest_topic, msg) + kafka_producer.flush() + + return {"success": True, "task_id": task_id} + + except Exception as err: + print(f"error: {err}") + + +@router.get("/ingest/tasks/{task_id}") +async def get_task(task_id: str): + task_manager = IngestTaskManager(redis_client) + + task = task_manager.get(task_id) + + if task: + return task + + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={"sucess": False, "error": {"message": "Task not found"}}, ) - chunks = [] - summary_documents = [] - if payload.files: - chunks, summary_documents = await handle_urls( - embedding_service=embedding_service, - files=payload.files, - config=payload.document_processor, - ) - elif payload.google_drive: - chunks, summary_documents = await handle_google_drive( - embedding_service, payload.google_drive - ) # type: ignore TODO: Fix typing - tasks = [ - embedding_service.embed_and_upsert( - chunks=chunks, encoder=encoder, index_name=payload.index_name - ), - ] +async def ingest(payload: IngestPayload, task_manager: IngestTaskManager) -> Dict: + try: + encoder = payload.document_processor.encoder.get_encoder() + embedding_service = EmbeddingService( + encoder=encoder, + index_name=payload.index_name, + vector_credentials=payload.vector_database, + dimensions=payload.document_processor.encoder.dimensions, + ) + chunks = [] + summary_documents = [] + if payload.files: + chunks, summary_documents = await handle_urls( + embedding_service=embedding_service, + files=payload.files, + config=payload.document_processor, + ) + + elif payload.google_drive: + chunks, summary_documents = await handle_google_drive( + embedding_service, payload.google_drive + ) # type: ignore TODO: Fix typing - if summary_documents and all(item is not None for item in summary_documents): - tasks.append( + tasks = [ embedding_service.embed_and_upsert( - chunks=summary_documents, - encoder=encoder, - index_name=f"{payload.index_name}{SUMMARY_SUFFIX}", + chunks=chunks, encoder=encoder, index_name=payload.index_name + ), + ] + + if summary_documents and all(item is not None for item in summary_documents): + tasks.append( + embedding_service.embed_and_upsert( + chunks=summary_documents, + encoder=encoder, + index_name=f"{payload.index_name}{SUMMARY_SUFFIX}", + ) ) - ) - await asyncio.gather(*tasks) + await asyncio.gather(*tasks) - if payload.webhook_url: - async with aiohttp.ClientSession() as session: - await session.post( - url=payload.webhook_url, - json={"index_name": payload.index_name, "status": "completed"}, - ) + if payload.webhook_url: + async with aiohttp.ClientSession() as session: + await session.post( + url=payload.webhook_url, + json={"index_name": payload.index_name, "status": "completed"}, + ) - return {"success": True, "index_name": payload.index_name} + task_manager.update( + task_id=payload.task_id, + task=UpdateTaskDto( + status=TaskStatus.DONE, + ), + ) + except Exception as e: + print("Marking task as failed...", e) + task_manager.update( + task_id=payload.task_id, + task=UpdateTaskDto( + status=TaskStatus.FAILED, + error=ApiError(message=str(e)), + ), + ) diff --git a/models/api.py b/models/api.py new file mode 100644 index 00000000..30629c21 --- /dev/null +++ b/models/api.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel +from typing import Optional + + +class ApiError(BaseModel): + message: Optional[str] diff --git a/models/ingest.py b/models/ingest.py index e82920d9..2f02bee2 100644 --- a/models/ingest.py +++ b/models/ingest.py @@ -7,6 +7,7 @@ from models.file import File from models.google_drive import GoogleDrive from models.vector_database import VectorDatabase +from models.api import ApiError class EncoderProvider(str, Enum): @@ -90,3 +91,14 @@ class RequestPayload(BaseModel): files: Optional[List[File]] = None google_drive: Optional[GoogleDrive] = None webhook_url: Optional[str] = None + + +class TaskStatus(str, Enum): + DONE = "DONE" + PENDING = "PENDING" + FAILED = "FAILED" + + +class IngestTaskResponse(BaseModel): + status: TaskStatus + error: Optional[ApiError] = None diff --git a/service/kafka/config.py b/service/kafka/config.py new file mode 100644 index 00000000..5f5344e5 --- /dev/null +++ b/service/kafka/config.py @@ -0,0 +1,9 @@ +from decouple import config + + +ingest_topic = config("KAFKA_TOPIC_INGEST", default="ingestion") + + +kafka_bootstrap_servers: str = config( + "KAFKA_BOOTSTRAP_SERVERS", default="localhost:9092" +) diff --git a/service/kafka/consume.py b/service/kafka/consume.py new file mode 100644 index 00000000..016be57d --- /dev/null +++ b/service/kafka/consume.py @@ -0,0 +1,41 @@ +import asyncio +from api.ingest import ingest as _ingest, IngestPayload + + +from service.redis.client import redis_client +from service.redis.ingest_task_manager import IngestTaskManager +from service.kafka.config import ingest_topic +from service.kafka.consumer import get_kafka_consumer + +from kafka.consumer.fetcher import ConsumerRecord + + +async def ingest(msg: ConsumerRecord): + payload = IngestPayload(**msg.value) + task_manager = IngestTaskManager(redis_client) + await _ingest(payload, task_manager) + + +kafka_actions = { + ingest_topic: ingest, +} + + +async def process_msg(msg: ConsumerRecord, topic: str, consumer): + await kafka_actions[topic](msg) + consumer.commit() + + +async def consume(): + consumer = get_kafka_consumer(ingest_topic) + + while True: + # Response format is {TopicPartiton('topic1', 1): [msg1, msg2]} + msg_pack = consumer.poll(timeout_ms=3000) + + for tp, messages in msg_pack.items(): + for message in messages: + await process_msg(message, tp.topic, consumer) + + +asyncio.run(consume()) diff --git a/service/redis/ingest_task_manager.py b/service/redis/ingest_task_manager.py new file mode 100644 index 00000000..83098bff --- /dev/null +++ b/service/redis/ingest_task_manager.py @@ -0,0 +1,30 @@ +from redis import Redis +import json +from models.ingest import IngestTaskResponse + + +class CreateTaskDto(IngestTaskResponse): + pass + + +class UpdateTaskDto(IngestTaskResponse): + pass + + +class IngestTaskManager: + def __init__(self, redis_client: Redis): + self.redis_client = redis_client + + def create(self, task: CreateTaskDto): + task_id = self.redis_client.incr("task_id") + self.redis_client.set(task_id, task.model_dump_json()) + return task_id + + def get(self, task_id): + return IngestTaskResponse(**json.loads(self.redis_client.get(task_id))) + + def update(self, task_id, task: UpdateTaskDto): + self.redis_client.set(task_id, task.model_dump_json()) + + def delete(self, task_id): + self.redis_client.delete(task_id) From aafe22b646ebbfa9cc32a4d03b4b8dc297cb0507 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 10:46:59 +0400 Subject: [PATCH 06/14] chore: fix formatting --- api/ingest.py | 19 ++++++++----------- models/api.py | 3 ++- models/ingest.py | 2 +- service/kafka/config.py | 1 - service/kafka/consume.py | 10 +++++----- service/kafka/consumer.py | 4 +++- service/kafka/producer.py | 3 ++- service/redis/client.py | 2 +- service/redis/ingest_task_manager.py | 4 +++- 9 files changed, 25 insertions(+), 23 deletions(-) diff --git a/api/ingest.py b/api/ingest.py index da394c04..d67fc758 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -2,25 +2,22 @@ from typing import Dict import aiohttp -from fastapi import APIRouter +from fastapi import APIRouter, status +from fastapi.responses import JSONResponse -from models.ingest import RequestPayload, TaskStatus from models.api import ApiError +from models.ingest import RequestPayload, TaskStatus from service.embedding import EmbeddingService from service.ingest import handle_google_drive, handle_urls -from utils.summarise import SUMMARY_SUFFIX - +from service.kafka.config import ingest_topic +from service.kafka.producer import kafka_producer +from service.redis.client import redis_client from service.redis.ingest_task_manager import ( - IngestTaskManager, CreateTaskDto, + IngestTaskManager, UpdateTaskDto, ) -from service.redis.client import redis_client - -from fastapi.responses import JSONResponse -from fastapi import status -from service.kafka.config import kafka_bootstrap_servers, ingest_topic -from service.kafka.producer import kafka_producer +from utils.summarise import SUMMARY_SUFFIX router = APIRouter() diff --git a/models/api.py b/models/api.py index 30629c21..1ce29cf5 100644 --- a/models/api.py +++ b/models/api.py @@ -1,6 +1,7 @@ -from pydantic import BaseModel from typing import Optional +from pydantic import BaseModel + class ApiError(BaseModel): message: Optional[str] diff --git a/models/ingest.py b/models/ingest.py index 2f02bee2..690bcb92 100644 --- a/models/ingest.py +++ b/models/ingest.py @@ -4,10 +4,10 @@ from pydantic import BaseModel, Field from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder +from models.api import ApiError from models.file import File from models.google_drive import GoogleDrive from models.vector_database import VectorDatabase -from models.api import ApiError class EncoderProvider(str, Enum): diff --git a/service/kafka/config.py b/service/kafka/config.py index 5f5344e5..80b7f5f9 100644 --- a/service/kafka/config.py +++ b/service/kafka/config.py @@ -1,6 +1,5 @@ from decouple import config - ingest_topic = config("KAFKA_TOPIC_INGEST", default="ingestion") diff --git a/service/kafka/consume.py b/service/kafka/consume.py index 016be57d..b1bde197 100644 --- a/service/kafka/consume.py +++ b/service/kafka/consume.py @@ -1,13 +1,13 @@ import asyncio -from api.ingest import ingest as _ingest, IngestPayload +from kafka.consumer.fetcher import ConsumerRecord -from service.redis.client import redis_client -from service.redis.ingest_task_manager import IngestTaskManager +from api.ingest import IngestPayload +from api.ingest import ingest as _ingest from service.kafka.config import ingest_topic from service.kafka.consumer import get_kafka_consumer - -from kafka.consumer.fetcher import ConsumerRecord +from service.redis.client import redis_client +from service.redis.ingest_task_manager import IngestTaskManager async def ingest(msg: ConsumerRecord): diff --git a/service/kafka/consumer.py b/service/kafka/consumer.py index 29c36265..c0e7bc4e 100644 --- a/service/kafka/consumer.py +++ b/service/kafka/consumer.py @@ -1,6 +1,8 @@ +import json + from kafka import KafkaConsumer + from service.kafka.config import kafka_bootstrap_servers -import json def get_kafka_consumer(topic: str): diff --git a/service/kafka/producer.py b/service/kafka/producer.py index 2d2aa692..a4a65941 100644 --- a/service/kafka/producer.py +++ b/service/kafka/producer.py @@ -1,4 +1,5 @@ -from service.kafka.config import kafka_bootstrap_servers from kafka import KafkaProducer +from service.kafka.config import kafka_bootstrap_servers + kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers) diff --git a/service/redis/client.py b/service/redis/client.py index d4ab2143..0a6f61cd 100644 --- a/service/redis/client.py +++ b/service/redis/client.py @@ -1,5 +1,5 @@ -from redis import Redis from decouple import config +from redis import Redis redis_client = Redis( host=config("REDIS_HOST", "localhost"), port=config("REDIS_PORT", 6379) diff --git a/service/redis/ingest_task_manager.py b/service/redis/ingest_task_manager.py index 83098bff..d793f5c3 100644 --- a/service/redis/ingest_task_manager.py +++ b/service/redis/ingest_task_manager.py @@ -1,5 +1,7 @@ -from redis import Redis import json + +from redis import Redis + from models.ingest import IngestTaskResponse From eb8cdcdd255b0c0b6c299c1462bc4945b3c3ce38 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 18:43:14 +0400 Subject: [PATCH 07/14] feat: add support for long polling --- api/ingest.py | 54 +++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/api/ingest.py b/api/ingest.py index d67fc758..b96f98e0 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -1,4 +1,6 @@ import asyncio +import time +import logging from typing import Dict import aiohttp @@ -19,9 +21,13 @@ ) from utils.summarise import SUMMARY_SUFFIX + router = APIRouter() +logger = logging.getLogger(__name__) + + class IngestPayload(RequestPayload): task_id: str @@ -31,7 +37,6 @@ async def add_ingest_queue(payload: RequestPayload): try: task_manager = IngestTaskManager(redis_client) task_id = task_manager.create(CreateTaskDto(status=TaskStatus.PENDING)) - print("Task ID: ", task_id) message = IngestPayload(**payload.model_dump(), task_id=str(task_id)) @@ -40,25 +45,54 @@ async def add_ingest_queue(payload: RequestPayload): kafka_producer.send(ingest_topic, msg) kafka_producer.flush() - return {"success": True, "task_id": task_id} + logger.info(f"Task {task_id} added to the queue") + + return {"success": True, "task": {"id": task_id}} except Exception as err: print(f"error: {err}") @router.get("/ingest/tasks/{task_id}") -async def get_task(task_id: str): +async def get_task( + task_id: str, + long_polling: bool = False, +): + print("ALIALIALI", long_polling) + if long_polling: + logger.info(f"Long pooling is enabled for task {task_id}") + else: + logger.info(f"Long pooling is disabled for task {task_id}") + task_manager = IngestTaskManager(redis_client) - task = task_manager.get(task_id) + if not long_polling: + task = task_manager.get(task_id) + if not task: + logger.warning(f"Task {task_id} not found") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={"sucess": False, "error": {"message": "Task not found"}}, + ) + + return {"success": True, "task": task.model_dump()} + + else: + timeout_time = time.time() + 30 # 30 seconds from now + sleep_interval = 3 # seconds - if task: - return task + while time.time() < timeout_time: + task = task_manager.get(task_id) + if task.status != TaskStatus.PENDING: + return {"success": True, "task": task.model_dump()} + await asyncio.sleep(sleep_interval) - return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content={"sucess": False, "error": {"message": "Task not found"}}, - ) + logger.warning(f"Request timeout for task {task_id}") + + return JSONResponse( + status_code=status.HTTP_408_REQUEST_TIMEOUT, + content={"sucess": False, "error": {"message": "Request timeout"}}, + ) async def ingest(payload: IngestPayload, task_manager: IngestTaskManager) -> Dict: From 8e94713a906f6289a4efbec2481051c661a7e205 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Mon, 11 Mar 2024 20:36:30 +0400 Subject: [PATCH 08/14] chore: remove prints --- api/ingest.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/api/ingest.py b/api/ingest.py index b96f98e0..e2021533 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -50,7 +50,11 @@ async def add_ingest_queue(payload: RequestPayload): return {"success": True, "task": {"id": task_id}} except Exception as err: - print(f"error: {err}") + logger.error(f"Error adding task to the queue: {err}") + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"sucess": False, "error": {"message": "Internal server error"}}, + ) @router.get("/ingest/tasks/{task_id}") @@ -58,7 +62,6 @@ async def get_task( task_id: str, long_polling: bool = False, ): - print("ALIALIALI", long_polling) if long_polling: logger.info(f"Long pooling is enabled for task {task_id}") else: @@ -149,7 +152,7 @@ async def ingest(payload: IngestPayload, task_manager: IngestTaskManager) -> Dic ), ) except Exception as e: - print("Marking task as failed...", e) + logger.error(f"Error processing ingest task: {e}") task_manager.update( task_id=payload.task_id, task=UpdateTaskDto( From 8f94d05ff1e22b364de4edd56f56c82491ae771e Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Tue, 12 Mar 2024 11:05:01 +0400 Subject: [PATCH 09/14] chore: small improvement --- api/ingest.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/api/ingest.py b/api/ingest.py index e2021533..fce429c1 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -81,10 +81,11 @@ async def get_task( return {"success": True, "task": task.model_dump()} else: - timeout_time = time.time() + 30 # 30 seconds from now + start_time = time.time() + timeout_time = start_time + 30 # 30 seconds from now sleep_interval = 3 # seconds - while time.time() < timeout_time: + while start_time < timeout_time: task = task_manager.get(task_id) if task.status != TaskStatus.PENDING: return {"success": True, "task": task.model_dump()} From ecf6d04e6ac7af8c7ca2fcd0aff883685f6c9b14 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Tue, 12 Mar 2024 11:05:14 +0400 Subject: [PATCH 10/14] chore(docs): update readme.md --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index 4bf1ba4c..ecadcc3f 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,18 @@ Easiest way to get started is to use our [Cloud API](https://d3jvqvcd9u4534.clou 5. Run server ```bash uvicorn main:app --reload + ``` +6. Start Kafka & Redis + ```bash + docker compose up -d + ``` +7. Start Kafka consumer + ```bash + cd ./service/kafka & python3 ./consume.py ``` + + + ## 🤖 Interpreter mode Super-Rag has built in support for running computational Q&A using code interpreters powered by [E2B.dev](https://e2b.dev) custom runtimes. You can signup to receive an API key to leverage they sandboxes in a cloud environment or setup your own by following [these instructions](https://github.com/e2b-dev/infra). From d12b73ea0f2702f457a0632826869c685e06f4c4 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Tue, 12 Mar 2024 11:49:47 +0400 Subject: [PATCH 11/14] chore: small fix --- api/ingest.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/api/ingest.py b/api/ingest.py index fce429c1..9c8989cc 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -69,17 +69,18 @@ async def get_task( task_manager = IngestTaskManager(redis_client) + def handle_task_not_found(task_id: str): + logger.warning(f"Task {task_id} not found") + return JSONResponse( + status_code=status.HTTP_404_NOT_FOUND, + content={"success": False, "error": {"message": "Task not found"}}, + ) + if not long_polling: task = task_manager.get(task_id) if not task: - logger.warning(f"Task {task_id} not found") - return JSONResponse( - status_code=status.HTTP_404_NOT_FOUND, - content={"sucess": False, "error": {"message": "Task not found"}}, - ) - + return handle_task_not_found(task_id) return {"success": True, "task": task.model_dump()} - else: start_time = time.time() timeout_time = start_time + 30 # 30 seconds from now @@ -87,6 +88,10 @@ async def get_task( while start_time < timeout_time: task = task_manager.get(task_id) + + if task is None: + handle_task_not_found(task_id) + if task.status != TaskStatus.PENDING: return {"success": True, "task": task.model_dump()} await asyncio.sleep(sleep_interval) From 433c2880a90edd9f36b60155559fdf8227380831 Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Tue, 12 Mar 2024 12:23:05 +0400 Subject: [PATCH 12/14] feat: use key prefix in redis --- api/ingest.py | 2 +- service/redis/ingest_task_manager.py | 25 ++++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/api/ingest.py b/api/ingest.py index 9c8989cc..45767ea5 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -90,7 +90,7 @@ def handle_task_not_found(task_id: str): task = task_manager.get(task_id) if task is None: - handle_task_not_found(task_id) + return handle_task_not_found(task_id) if task.status != TaskStatus.PENDING: return {"success": True, "task": task.model_dump()} diff --git a/service/redis/ingest_task_manager.py b/service/redis/ingest_task_manager.py index d793f5c3..964eaa14 100644 --- a/service/redis/ingest_task_manager.py +++ b/service/redis/ingest_task_manager.py @@ -14,19 +14,34 @@ class UpdateTaskDto(IngestTaskResponse): class IngestTaskManager: + TASK_PREFIX = "ingest:task:" + INGESTION_TASK_ID_KEY = "ingestion_task_id" + def __init__(self, redis_client: Redis): self.redis_client = redis_client + def _get_task_key(self, task_id): + return f"{self.TASK_PREFIX}{task_id}" + def create(self, task: CreateTaskDto): - task_id = self.redis_client.incr("task_id") - self.redis_client.set(task_id, task.model_dump_json()) + task_id = self.redis_client.incr(self.INGESTION_TASK_ID_KEY) + task_key = self._get_task_key(task_id) + self.redis_client.set(task_key, task.model_dump_json()) return task_id def get(self, task_id): - return IngestTaskResponse(**json.loads(self.redis_client.get(task_id))) + task_key = self._get_task_key(task_id) + task_data = self.redis_client.get(task_key) + + if task_data: + return IngestTaskResponse(**json.loads(task_data)) + else: + return None def update(self, task_id, task: UpdateTaskDto): - self.redis_client.set(task_id, task.model_dump_json()) + task_key = self._get_task_key(task_id) + self.redis_client.set(task_key, task.model_dump_json()) def delete(self, task_id): - self.redis_client.delete(task_id) + task_key = self._get_task_key(task_id) + self.redis_client.delete(task_key) From 00c2ed462d385a80954e0bed7b0405e644b7a4be Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Sat, 16 Mar 2024 11:12:16 +0400 Subject: [PATCH 13/14] chore: move docker files to .docker folder --- Dockerfile => .docker/Dockerfile | 0 docker-compose.yml => .docker/docker-compose.dev.yml | 0 README.md | 2 +- 3 files changed, 1 insertion(+), 1 deletion(-) rename Dockerfile => .docker/Dockerfile (100%) rename docker-compose.yml => .docker/docker-compose.dev.yml (100%) diff --git a/Dockerfile b/.docker/Dockerfile similarity index 100% rename from Dockerfile rename to .docker/Dockerfile diff --git a/docker-compose.yml b/.docker/docker-compose.dev.yml similarity index 100% rename from docker-compose.yml rename to .docker/docker-compose.dev.yml diff --git a/README.md b/README.md index ecadcc3f..6f51d6b1 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ Easiest way to get started is to use our [Cloud API](https://d3jvqvcd9u4534.clou ``` 6. Start Kafka & Redis ```bash - docker compose up -d + docker compose -f docker-compose.dev.yml up -d ``` 7. Start Kafka consumer ```bash From ee7dc655aa2c48f10176d1dae46a1b4a3fddcc7e Mon Sep 17 00:00:00 2001 From: alisalim17 Date: Sat, 16 Mar 2024 11:14:29 +0400 Subject: [PATCH 14/14] chore: add sasl password and username --- .env.example | 6 +++++- service/kafka/consumer.py | 9 +++++++-- service/kafka/producer.py | 11 +++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/.env.example b/.env.example index a4f2a8ae..15d40430 100644 --- a/.env.example +++ b/.env.example @@ -26,4 +26,8 @@ REDIS_PORT=6379 # Kafka KAFKA_TOPIC_INGESTION=ingestion -KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093) \ No newline at end of file +KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093) +KAFKA_SECURITY_PROTOCOL= +KAFKA_SASL_MECHANISM= +KAFKA_SASL_PLAIN_USERNAME= +KAFKA_SASL_PLAIN_PASSWORD= \ No newline at end of file diff --git a/service/kafka/consumer.py b/service/kafka/consumer.py index c0e7bc4e..de150c2e 100644 --- a/service/kafka/consumer.py +++ b/service/kafka/consumer.py @@ -1,6 +1,7 @@ import json from kafka import KafkaConsumer +from decouple import config from service.kafka.config import kafka_bootstrap_servers @@ -9,10 +10,14 @@ def get_kafka_consumer(topic: str): consumer = KafkaConsumer( topic, bootstrap_servers=kafka_bootstrap_servers, + group_id="my-group", + security_protocol=config("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT"), + sasl_mechanism=config("KAFKA_SASL_MECHANISM", "PLAIN"), + sasl_plain_username=config("KAFKA_SASL_PLAIN_USERNAME", None), + sasl_plain_password=config("KAFKA_SASL_PLAIN_PASSWORD", None), auto_offset_reset="earliest", - enable_auto_commit=False, value_deserializer=lambda m: json.loads(m.decode("ascii")), - group_id="my-group", + enable_auto_commit=False, ) return consumer diff --git a/service/kafka/producer.py b/service/kafka/producer.py index a4a65941..b1933bae 100644 --- a/service/kafka/producer.py +++ b/service/kafka/producer.py @@ -1,5 +1,12 @@ from kafka import KafkaProducer - +from decouple import config from service.kafka.config import kafka_bootstrap_servers -kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers) +kafka_producer = KafkaProducer( + bootstrap_servers=kafka_bootstrap_servers, + security_protocol=config("KAFKA_SECURITY_PROTOCOL", "PLAINTEXT"), + sasl_mechanism=config("KAFKA_SASL_MECHANISM", "PLAIN"), + sasl_plain_username=config("KAFKA_SASL_PLAIN_USERNAME", None), + sasl_plain_password=config("KAFKA_SASL_PLAIN_PASSWORD", None), + api_version_auto_timeout_ms=100000, +)