Skip to content

Commit

Permalink
[NHUB-595] fix(celery): Dont use async for task.apply_async in celery…
Browse files Browse the repository at this point in the history
… beat
  • Loading branch information
MarkLark86 committed Nov 26, 2024
1 parent fa2f824 commit bc35562
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 16 deletions.
10 changes: 7 additions & 3 deletions superdesk/celery_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from sys import argv
import redis
from celery import Celery

from .context_task import HybridAppContextTask
from .context_task import HybridAppContextTask, HybridAppContextWorkerTask
from .serializer import CELERY_SERIALIZER_NAME, ContextAwareSerializerFactory

from superdesk.logging import logger
Expand All @@ -22,9 +23,12 @@
serializer_factory = ContextAwareSerializerFactory(get_current_app)
serializer_factory.register_serializer(CELERY_SERIALIZER_NAME)

# If ``celery`` is in the executable path and ``beat`` is in the arguments
# then this code is running in a celery beat process
IS_BEAT_PROCESS = "celery" in argv[0] and "beat" in argv

# set up celery with our custom Task which handles async/sync tasks + app context
celery = Celery(__name__)
celery.Task = HybridAppContextTask
celery = Celery(__name__, task_cls=HybridAppContextTask if IS_BEAT_PROCESS else HybridAppContextWorkerTask)


def init_celery(app):
Expand Down
37 changes: 24 additions & 13 deletions superdesk/celery_app/context_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import werkzeug

from celery import Task
from typing import Any, Tuple, Dict
from typing import Any

from superdesk.logging import logger
from superdesk.errors import SuperdeskError
Expand All @@ -14,6 +14,8 @@ class HybridAppContextTask(Task):
"""
A task class that supports running both synchronous and asynchronous tasks within the Flask application context.
It handles exceptions specifically defined in `app_errors` and logs them.
Note: For use with celery beat process
"""

abstract = True
Expand Down Expand Up @@ -86,24 +88,13 @@ async def wrapper():
task.add_done_callback(background_tasks.discard)
return task

async def apply_async(self, args: Tuple = (), kwargs: Dict = {}, **other_kwargs) -> Any:
"""
Schedules the task asynchronously. Awaits the result if `CELERY_TASK_ALWAYS_EAGER` is True.
"""
# directly run and await the task if eager
if self._is_always_eager():
async_result = super().apply_async(args=args, kwargs=kwargs, **other_kwargs)
return await async_result.get()

return super().apply_async(args=args, kwargs=kwargs, **other_kwargs)

def handle_exception(self, exc: Exception) -> None:
"""
Logs an exception using the configured logger from `superdesk.logging`.
"""
logger.exception(f"Error handling task: {str(exc)}")

def on_failure(self, exc: Exception, task_id: str, args: Tuple, kwargs: Dict, einfo: str) -> None:
def on_failure(self, exc: Exception, task_id: str, args: tuple, kwargs: dict, einfo: str) -> None:
"""
Handles task failure by logging the exception within the Flask application context.
"""
Expand All @@ -114,3 +105,23 @@ def on_failure(self, exc: Exception, task_id: str, args: Tuple, kwargs: Dict, ei
def _is_always_eager(self):
app = self.get_current_app()
return app.config.get("CELERY_TASK_ALWAYS_EAGER", False)


class HybridAppContextWorkerTask(HybridAppContextTask):
"""
A task class that supports running both synchronous and asynchronous tasks within the Flask application context.
It handles exceptions specifically defined in `app_errors` and logs them.
Note: For use with celery worker and ASGI processes
"""

async def apply_async(self, args: tuple = (), kwargs: dict | None = None, **other_kwargs) -> Any:
"""
Schedules the task asynchronously. Awaits the result if `CELERY_TASK_ALWAYS_EAGER` is True.
"""
# directly run and await the task if eager
if self._is_always_eager():
async_result = super().apply_async(args=args, kwargs=kwargs, **other_kwargs)
return await async_result.get()

return super().apply_async(args=args, kwargs=kwargs, **other_kwargs)

0 comments on commit bc35562

Please sign in to comment.