Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Confluent message consuming refactoring #1904

Open
Lancetnik opened this issue Nov 7, 2024 · 2 comments · May be fixed by #1779
Open

Feature: Confluent message consuming refactoring #1904

Lancetnik opened this issue Nov 7, 2024 · 2 comments · May be fixed by #1779
Labels
Confluent Issues related to `faststream.confluent` module enhancement New feature or request help wanted Extra attention is needed

Comments

@Lancetnik
Copy link
Member

Current ConfluentConsumer implementation is pretty dirty - https://github.com/airtai/faststream/blob/main/faststream/confluent/client.py#L219

It uses sync_to_async everywhere, that can leads to race conditions and segmentation faults in C code.

We should refactor it to use separated thread as synchronous consumer flow and requeue messages from in-memory to asynchronous code. I can suggest use something like that

import threading
from queue import Queue, Empty
from typing import Optional, Any, List
from types import TracebackType

import anyio
from typing_extensions import Self

from faststream._internal.utils.functions import run_in_threadpool


class EventSource:
    def __init__(self) -> None:
        self.thread = threading.Thread(target=self._put_task)

        self._queue = Queue(maxsize=3)
        self.__stop_signal = threading.Event()

    @property
    def completed(self) -> bool: 
        return self._queue.empty()

    async def get_one(self, timeout: Optional[float] = 3.0) -> Optional[Any]:
        try:
            return await run_in_threadpool(self._queue.get, True, timeout)
        except Empty:
            return None

    async def get_many(
        self,
        timeout: float = 0.1,
        max_records: Optional[int] = 10,
    ) -> List[Any]:
        data = []
        with anyio.move_on_after(delay=timeout):
            while len(data) < max_records:
                if msg := await self.get_one(timeout=timeout):
                    data.append(msg)
        return data

    def start(self) -> None:
        self.thread.start()
    
    def stop_nowait(self) -> None:
        self.__stop_signal.set()

    def stop(self) -> None:
        self.stop_nowait()
        self.thread.join()

    def __enter__(self) -> Self:
        self.start()
        return self
    
    def __exit__(
        self,
        exc_type: Optional[type[BaseException]] = None,
        exc_val: Optional[BaseException] = None,
        exc_tb: Optional[TracebackType] = None,
    ) -> None:
        self.stop()

    def _put_task(self) -> None:
        # TODO: implement messages consuming here
        import time
        import random
        while not self.__stop_signal.is_set():
            print("put msg")
            time.sleep(random.randint(5, 20) / 10)
            self._queue.put("CONFLUENT MESSAGE")


async def main():
    with EventSource() as source:
        while True:
            msg = await source.get_many(timeout=3)
            print(len(msg), msg)
        
anyio.run(main)
@Lancetnik Lancetnik added enhancement New feature or request help wanted Extra attention is needed Confluent Issues related to `faststream.confluent` module labels Nov 7, 2024
@joncourt
Copy link

We've noticed segfaults and some kind of deadlock behaviour in our code where a close and a consume operation on the confluent_kafka lib occur simultaneously. We had to do some locking to ensure that didn't happen.

@Lancetnik
Copy link
Member Author

@joncourt it is a good point, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Confluent Issues related to `faststream.confluent` module enhancement New feature or request help wanted Extra attention is needed
Projects
Status: Waiting for merge
2 participants