Way to wait until all tasks are handled #350
-
I'm writing a small script that does a lot IO with an async library, so I run it via taskiq to parallelize it, and it works great! My main task fetches pages of data in a loop and starts new tasks to process, but it doesn't wait for them. How to detect all the tasks are done to shutdown the broker and safely end the script? I can't just await the main task, as it will finish before other tasks. |
Beta Was this translation helpful? Give feedback.
Answered by
s3rius
Dec 14, 2024
Replies: 1 comment 3 replies
-
class FinishEventMiddleware(TaskiqMiddleware):
def __init__(self):
super().__init__()
self._count = 0
self.done = asyncio.Event()
def pre_execute(self, message: "TaskiqMessage") -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
if self.done.is_set():
raise ValueError('Broker is done')
self._count += 1
return message
def post_execute(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None:
self._count -= 1
if not self._count:
self.done.set()
`` |
Beta Was this translation helpful? Give feedback.
3 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@rafalkrupinski, here's a small update. Now when you shutdown worker process it will first stop accepting new tasks and will wait until all tasks are completed.
https://github.com/taskiq-python/taskiq/releases/tag/0.11.9
Here's doc about it: https://taskiq-python.github.io/guide/cli.html#graceful-and-force-shutdowns