Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka #91

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
File renamed without changes.
80 changes: 80 additions & 0 deletions .docker/docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
version: "3"
services:
kafka1:
image: confluentinc/cp-kafka
container_name: kafka1
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
KAFKA_LISTENERS: "INTERNAL://kafka1:29092,CONTROLLER://kafka1:29093,EXTERNAL://0.0.0.0:9092"
KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka1:29092,EXTERNAL://localhost:9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:29093"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
CLUSTER_ID: "vGAyBqiIjJIyN9Tp5B3aVQ=="
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"

healthcheck:
test: nc -z localhost 9092 || exit 1
interval: 10s
timeout: 5s
retries: 15

# schema-registry0:
# image: confluentinc/cp-schema-registry
# container_name: schema-registry0
# hostname: schema-registry0
# ports:
# - "8081:8081"
# environment:
# SCHEMA_REGISTRY_HOST_NAME: schema-registry0
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka1:29092"
# SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081"
# depends_on:
# - kafka1

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- "3001:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092

# KAFKA_CLUSTERS_0_METRICS_PORT: 9997
# KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8081

depends_on:
- kafka1
# kafka-init-topics:
# image: confluentinc/cp-kafka:7.2.1
# volumes:
# - ./data/message.json:/data/message.json
# depends_on:
# - kafka1
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
# cub kafka-ready -b kafka1:29092 1 30 && \
# kafka-topics --create --topic second.users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
# kafka-topics --create --topic second.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
# kafka-topics --create --topic first.messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka0:29092 && \
# kafka-console-producer --bootstrap-server kafka1:29092 -topic second.users < /data/message.json'"
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
networks:
- my-network

networks:
my-network:
name: my-network
external: true
12 changes: 12 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,15 @@ PINECONE_INDEX=
# Unstructured API
UNSTRUCTURED_IO_API_KEY=
UNSTRUCTURED_IO_SERVER_URL=

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379

# Kafka
KAFKA_TOPIC_INGESTION=ingestion
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 # Comma separated list of kafka brokers (e.g. localhost:9092,localhost:9093)
KAFKA_SECURITY_PROTOCOL=
KAFKA_SASL_MECHANISM=
KAFKA_SASL_PLAIN_USERNAME=
KAFKA_SASL_PLAIN_PASSWORD=
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,18 @@ Easiest way to get started is to use our [Cloud API](https://d3jvqvcd9u4534.clou
5. Run server
```bash
uvicorn main:app --reload
```
6. Start Kafka & Redis
```bash
docker compose -f docker-compose.dev.yml up -d
```
7. Start Kafka consumer
```bash
cd ./service/kafka & python3 ./consume.py
```



## 🤖 Interpreter mode
Super-Rag has built in support for running computational Q&A using code interpreters powered by [E2B.dev](https://e2b.dev) custom runtimes. You can signup to receive an API key to leverage they sandboxes in a cloud environment or setup your own by following [these instructions](https://github.com/e2b-dev/infra).

Expand Down
186 changes: 146 additions & 40 deletions api/ingest.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,168 @@
import asyncio
import time
import logging
from typing import Dict

import aiohttp
from fastapi import APIRouter
from fastapi import APIRouter, status
from fastapi.responses import JSONResponse

from models.ingest import RequestPayload
from models.api import ApiError
from models.ingest import RequestPayload, TaskStatus
from service.embedding import EmbeddingService
from service.ingest import handle_google_drive, handle_urls
from service.kafka.config import ingest_topic
from service.kafka.producer import kafka_producer
from service.redis.client import redis_client
from service.redis.ingest_task_manager import (
CreateTaskDto,
IngestTaskManager,
UpdateTaskDto,
)
from utils.summarise import SUMMARY_SUFFIX


router = APIRouter()


logger = logging.getLogger(__name__)


class IngestPayload(RequestPayload):
task_id: str


@router.post("/ingest")
async def ingest(payload: RequestPayload) -> Dict:
encoder = payload.document_processor.encoder.get_encoder()
embedding_service = EmbeddingService(
encoder=encoder,
index_name=payload.index_name,
vector_credentials=payload.vector_database,
dimensions=payload.document_processor.encoder.dimensions,
)
chunks = []
summary_documents = []
if payload.files:
chunks, summary_documents = await handle_urls(
embedding_service=embedding_service,
files=payload.files,
config=payload.document_processor,
async def add_ingest_queue(payload: RequestPayload):
try:
task_manager = IngestTaskManager(redis_client)
task_id = task_manager.create(CreateTaskDto(status=TaskStatus.PENDING))

message = IngestPayload(**payload.model_dump(), task_id=str(task_id))

msg = message.model_dump_json().encode()

kafka_producer.send(ingest_topic, msg)
kafka_producer.flush()

logger.info(f"Task {task_id} added to the queue")

return {"success": True, "task": {"id": task_id}}

except Exception as err:
logger.error(f"Error adding task to the queue: {err}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"sucess": False, "error": {"message": "Internal server error"}},
)

elif payload.google_drive:
chunks, summary_documents = await handle_google_drive(
embedding_service, payload.google_drive
) # type: ignore TODO: Fix typing

tasks = [
embedding_service.embed_and_upsert(
chunks=chunks, encoder=encoder, index_name=payload.index_name
),
]
@router.get("/ingest/tasks/{task_id}")
async def get_task(
task_id: str,
long_polling: bool = False,
):
if long_polling:
logger.info(f"Long pooling is enabled for task {task_id}")
else:
logger.info(f"Long pooling is disabled for task {task_id}")

if summary_documents and all(item is not None for item in summary_documents):
tasks.append(
embedding_service.embed_and_upsert(
chunks=summary_documents,
encoder=encoder,
index_name=f"{payload.index_name}{SUMMARY_SUFFIX}",
)
task_manager = IngestTaskManager(redis_client)

def handle_task_not_found(task_id: str):
logger.warning(f"Task {task_id} not found")
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={"success": False, "error": {"message": "Task not found"}},
)

await asyncio.gather(*tasks)
if not long_polling:
task = task_manager.get(task_id)
if not task:
return handle_task_not_found(task_id)
return {"success": True, "task": task.model_dump()}
else:
start_time = time.time()
timeout_time = start_time + 30 # 30 seconds from now
sleep_interval = 3 # seconds

while start_time < timeout_time:
task = task_manager.get(task_id)

if task is None:
return handle_task_not_found(task_id)

if payload.webhook_url:
async with aiohttp.ClientSession() as session:
await session.post(
url=payload.webhook_url,
json={"index_name": payload.index_name, "status": "completed"},
if task.status != TaskStatus.PENDING:
return {"success": True, "task": task.model_dump()}
await asyncio.sleep(sleep_interval)

logger.warning(f"Request timeout for task {task_id}")

return JSONResponse(
status_code=status.HTTP_408_REQUEST_TIMEOUT,
content={"sucess": False, "error": {"message": "Request timeout"}},
)


async def ingest(payload: IngestPayload, task_manager: IngestTaskManager) -> Dict:
try:
encoder = payload.document_processor.encoder.get_encoder()
embedding_service = EmbeddingService(
encoder=encoder,
index_name=payload.index_name,
vector_credentials=payload.vector_database,
dimensions=payload.document_processor.encoder.dimensions,
)
chunks = []
summary_documents = []
if payload.files:
chunks, summary_documents = await handle_urls(
embedding_service=embedding_service,
files=payload.files,
config=payload.document_processor,
)

return {"success": True, "index_name": payload.index_name}
elif payload.google_drive:
chunks, summary_documents = await handle_google_drive(
embedding_service, payload.google_drive
) # type: ignore TODO: Fix typing

tasks = [
embedding_service.embed_and_upsert(
chunks=chunks, encoder=encoder, index_name=payload.index_name
),
]

if summary_documents and all(item is not None for item in summary_documents):
tasks.append(
embedding_service.embed_and_upsert(
chunks=summary_documents,
encoder=encoder,
index_name=f"{payload.index_name}{SUMMARY_SUFFIX}",
)
)

await asyncio.gather(*tasks)

if payload.webhook_url:
async with aiohttp.ClientSession() as session:
await session.post(
url=payload.webhook_url,
json={"index_name": payload.index_name, "status": "completed"},
)

task_manager.update(
task_id=payload.task_id,
task=UpdateTaskDto(
status=TaskStatus.DONE,
),
)
except Exception as e:
logger.error(f"Error processing ingest task: {e}")
task_manager.update(
task_id=payload.task_id,
task=UpdateTaskDto(
status=TaskStatus.FAILED,
error=ApiError(message=str(e)),
),
)
7 changes: 7 additions & 0 deletions models/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Optional

from pydantic import BaseModel


class ApiError(BaseModel):
message: Optional[str]
12 changes: 12 additions & 0 deletions models/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pydantic import BaseModel, Field
from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder

from models.api import ApiError
from models.file import File
from models.google_drive import GoogleDrive
from models.vector_database import VectorDatabase
Expand Down Expand Up @@ -90,3 +91,14 @@ class RequestPayload(BaseModel):
files: Optional[List[File]] = None
google_drive: Optional[GoogleDrive] = None
webhook_url: Optional[str] = None


class TaskStatus(str, Enum):
DONE = "DONE"
PENDING = "PENDING"
FAILED = "FAILED"


class IngestTaskResponse(BaseModel):
status: TaskStatus
error: Optional[ApiError] = None
Loading