diff --git a/api/ingest.py b/api/ingest.py index da394c04..d67fc758 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -2,25 +2,22 @@ from typing import Dict import aiohttp -from fastapi import APIRouter +from fastapi import APIRouter, status +from fastapi.responses import JSONResponse -from models.ingest import RequestPayload, TaskStatus from models.api import ApiError +from models.ingest import RequestPayload, TaskStatus from service.embedding import EmbeddingService from service.ingest import handle_google_drive, handle_urls -from utils.summarise import SUMMARY_SUFFIX - +from service.kafka.config import ingest_topic +from service.kafka.producer import kafka_producer +from service.redis.client import redis_client from service.redis.ingest_task_manager import ( - IngestTaskManager, CreateTaskDto, + IngestTaskManager, UpdateTaskDto, ) -from service.redis.client import redis_client - -from fastapi.responses import JSONResponse -from fastapi import status -from service.kafka.config import kafka_bootstrap_servers, ingest_topic -from service.kafka.producer import kafka_producer +from utils.summarise import SUMMARY_SUFFIX router = APIRouter() diff --git a/models/api.py b/models/api.py index 30629c21..1ce29cf5 100644 --- a/models/api.py +++ b/models/api.py @@ -1,6 +1,7 @@ -from pydantic import BaseModel from typing import Optional +from pydantic import BaseModel + class ApiError(BaseModel): message: Optional[str] diff --git a/models/ingest.py b/models/ingest.py index 2f02bee2..690bcb92 100644 --- a/models/ingest.py +++ b/models/ingest.py @@ -4,10 +4,10 @@ from pydantic import BaseModel, Field from semantic_router.encoders import BaseEncoder, CohereEncoder, OpenAIEncoder +from models.api import ApiError from models.file import File from models.google_drive import GoogleDrive from models.vector_database import VectorDatabase -from models.api import ApiError class EncoderProvider(str, Enum): diff --git a/service/kafka/config.py b/service/kafka/config.py index 5f5344e5..80b7f5f9 100644 --- a/service/kafka/config.py +++ b/service/kafka/config.py @@ -1,6 +1,5 @@ from decouple import config - ingest_topic = config("KAFKA_TOPIC_INGEST", default="ingestion") diff --git a/service/kafka/consume.py b/service/kafka/consume.py index 016be57d..b1bde197 100644 --- a/service/kafka/consume.py +++ b/service/kafka/consume.py @@ -1,13 +1,13 @@ import asyncio -from api.ingest import ingest as _ingest, IngestPayload +from kafka.consumer.fetcher import ConsumerRecord -from service.redis.client import redis_client -from service.redis.ingest_task_manager import IngestTaskManager +from api.ingest import IngestPayload +from api.ingest import ingest as _ingest from service.kafka.config import ingest_topic from service.kafka.consumer import get_kafka_consumer - -from kafka.consumer.fetcher import ConsumerRecord +from service.redis.client import redis_client +from service.redis.ingest_task_manager import IngestTaskManager async def ingest(msg: ConsumerRecord): diff --git a/service/kafka/consumer.py b/service/kafka/consumer.py index 29c36265..c0e7bc4e 100644 --- a/service/kafka/consumer.py +++ b/service/kafka/consumer.py @@ -1,6 +1,8 @@ +import json + from kafka import KafkaConsumer + from service.kafka.config import kafka_bootstrap_servers -import json def get_kafka_consumer(topic: str): diff --git a/service/kafka/producer.py b/service/kafka/producer.py index 2d2aa692..a4a65941 100644 --- a/service/kafka/producer.py +++ b/service/kafka/producer.py @@ -1,4 +1,5 @@ -from service.kafka.config import kafka_bootstrap_servers from kafka import KafkaProducer +from service.kafka.config import kafka_bootstrap_servers + kafka_producer = KafkaProducer(bootstrap_servers=kafka_bootstrap_servers) diff --git a/service/redis/client.py b/service/redis/client.py index d4ab2143..0a6f61cd 100644 --- a/service/redis/client.py +++ b/service/redis/client.py @@ -1,5 +1,5 @@ -from redis import Redis from decouple import config +from redis import Redis redis_client = Redis( host=config("REDIS_HOST", "localhost"), port=config("REDIS_PORT", 6379) diff --git a/service/redis/ingest_task_manager.py b/service/redis/ingest_task_manager.py index 83098bff..d793f5c3 100644 --- a/service/redis/ingest_task_manager.py +++ b/service/redis/ingest_task_manager.py @@ -1,5 +1,7 @@ -from redis import Redis import json + +from redis import Redis + from models.ingest import IngestTaskResponse