Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impl: signature in periodic task #361

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
ebe6975
feature: PeriodicTask by signature
SunnyCapt Sep 2, 2020
0b91eca
feature: Sign field
SunnyCapt Sep 3, 2020
967f200
Bugfix: id_rsa.pubpermissions & key files creating
SunnyCapt Sep 3, 2020
93431fb
Feature: callback in periodic task (support for option)
SunnyCapt Sep 3, 2020
f8e3f17
Fix: sign hash of serialized task signature
SunnyCapt Sep 7, 2020
6388db4
added tests of periodic tasks with task signatures
SunnyCapt Sep 8, 2020
7b02ec0
refactor of keys loading
SunnyCapt Sep 10, 2020
1d96917
Fix deprication warnings in tests and refactor key generating & loading
SunnyCapt Sep 10, 2020
834a432
Update authors file
SunnyCapt Sep 10, 2020
18b0290
Merge branch 'master' into master
SunnyCapt Oct 25, 2020
89cb908
fix imports
SunnyCapt Oct 26, 2020
e7fdac4
useless commit
SunnyCapt Oct 26, 2020
9b0c91e
Merge remote-tracking branch 'celery/master' into master
SunnyCapt Dec 6, 2020
7c9479c
call some functions before calling real apply_async
SunnyCapt Apr 20, 2021
28d6391
added comments about app.conf.call_before_run_periodic_task
SunnyCapt Apr 21, 2021
c85b77d
django_celery_beat.schedulers.DatabaseScheduler.apply_async refactoring
SunnyCapt Apr 21, 2021
bd74d99
Merge branch 'master' of https://github.com/celery/django-celery-beat
SunnyCapt Apr 21, 2021
7bfca12
fix schedulers.py imports
SunnyCapt Apr 21, 2021
02778de
fix performing an action before starting a periodic task
SunnyCapt Apr 21, 2021
2604ab9
fix tests
SunnyCapt Apr 21, 2021
f2ec316
added readable info about serialized task
SunnyCapt Apr 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ Wes Winham <[email protected]>
Williams Mendez <[email protected]>
WoLpH <[email protected]>
dongweiming <[email protected]>
SunnyCapt <[email protected]>
22 changes: 15 additions & 7 deletions django_celery_beat/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PeriodicTaskForm(forms.ModelForm):
required=False,
max_length=200,
)
# todo: add field for task_signature

class Meta:
"""Form metadata."""
Expand Down Expand Up @@ -198,19 +199,26 @@ def toggle_tasks(self, request, queryset):

def run_tasks(self, request, queryset):
self.celery_app.loader.import_default_modules()
tasks = [(self.celery_app.tasks.get(task.task),
loads(task.args),
loads(task.kwargs),
task.queue)
for task in queryset]
tasks = [
(
task.get_verified_task_signature(raise_exceptions=False)
if task.task_signature is not None
else self.celery_app.tasks.get(task.task),
loads(task.args),
loads(task.kwargs),
task.queue
) for task in queryset
]

if any(t[0] is None for t in tasks):
for i, t in enumerate(tasks):
if t[0] is None:
break

# variable "i" will be set because list "tasks" is not empty
not_found_task_name = queryset[i].task
not_found_task_name = queryset[i].get_verified_task_signature(raise_exceptions=False).name \
if queryset[i].task_signature is not None and queryset[i].get_verified_task_signature(
raise_exceptions=False) is not None else queryset[i].task

self.message_user(
request,
Expand All @@ -222,7 +230,7 @@ def run_tasks(self, request, queryset):
task_ids = [task.apply_async(args=args, kwargs=kwargs, queue=queue)
if queue and len(queue)
else task.apply_async(args=args, kwargs=kwargs)
for task, args, kwargs, queue in tasks]
for task, args, kwargs, queue in tasks if task is not None]
tasks_run = len(task_ids)
self.message_user(
request,
Expand Down
23 changes: 23 additions & 0 deletions django_celery_beat/migrations/0015_periodictask_task_signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 2.2.16 on 2020-09-01 10:17

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_celery_beat', '0014_remove_clockedschedule_enabled'),
]

operations = [
migrations.AddField(
model_name='periodictask',
name='task_signature',
field=models.BinaryField(help_text="Serialized `celery.canvas.Signature` type's object of task (or chain, group, etc.) got by https://pypi.org/project/dill/", null=True),
),
migrations.AddField(
model_name='periodictask',
name='task_signature_sign',
field=models.CharField(help_text="Signature (in hex) of serialized `celery.canvas.Signature` type's object (see task_signature field)", max_length=1028, null=True),
),
]
23 changes: 23 additions & 0 deletions django_celery_beat/migrations/0016_auto_20200903_1356.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 2.2.16 on 2020-09-03 13:56

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('django_celery_beat', '0015_periodictask_task_signature'),
]

operations = [
migrations.AddField(
model_name='periodictask',
name='callback_signature',
field=models.BinaryField(help_text="Serialized `celery.canvas.Signature` type's callback task got by https://pypi.org/project/dill/ (use as link arg in `.apply_async` method)", null=True),
),
migrations.AddField(
model_name='periodictask',
name='callback_signature_sign',
field=models.CharField(help_text="Signature (in hex) of serialized `celery.canvas.Signature` type's callback task (see callback_signature field)", max_length=1028, null=True),
),
]
79 changes: 75 additions & 4 deletions django_celery_beat/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Database models."""
from datetime import timedelta

import dill
import timezone_field
from celery import schedules, current_app
from celery.utils.log import get_logger
from django.conf import settings
from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
Expand All @@ -11,10 +13,11 @@
from django.utils.translation import gettext_lazy as _

from . import managers, validators
from .tzcrontab import TzAwareCrontab
from .utils import make_aware, now
from .clockedschedule import clocked
from .tzcrontab import TzAwareCrontab
from .utils import make_aware, now, verify_task_signature

logger = get_logger(__name__)

DAYS = 'days'
HOURS = 'hours'
Expand Down Expand Up @@ -389,6 +392,27 @@ class PeriodicTask(models.Model):
help_text=_('The Name of the Celery Task that Should be Run. '
'(Example: "proj.tasks.import_contacts")'),
)
task_signature = models.BinaryField(
null=True,
help_text='Serialized `celery.canvas.Signature` type\'s object of task (or chain, group, '
'etc.) got by https://pypi.org/project/dill/'
)
callback_signature = models.BinaryField(
null=True,
help_text='Serialized `celery.canvas.Signature` type\'s callback task got '
'by https://pypi.org/project/dill/ (use as link arg in `.apply_async` method)'
) # todo: add support for error_callback (link_error option)
task_signature_sign = models.CharField(
null=True,
max_length=1028,
help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s object (see task_signature field)'
)
callback_signature_sign = models.CharField(
null=True,
max_length=1028,
help_text='Signature (in hex) of serialized `celery.canvas.Signature` type\'s callback '
'task (see callback_signature field)'
)

# You can only set ONE of the following schedule FK's
# TODO: Redo this as a GenericForeignKey
Expand Down Expand Up @@ -549,8 +573,8 @@ def validate_unique(self, *args, **kwargs):
'must be set.'
)

err_msg = 'Only one of clocked, interval, crontab, '\
'or solar must be set'
err_msg = 'Only one of clocked, interval, crontab, ' \
'or solar must be set'
if len(selected_schedule_types) > 1:
error_info = {}
for selected_schedule_type in selected_schedule_types:
Expand Down Expand Up @@ -579,6 +603,53 @@ def _clean_expires(self):
_('Only one can be set, in expires and expire_seconds')
)

def get_verified_task_signature(self, raise_exceptions=True):
try:
self.get_verified_callback_signature()
except ValueError as e:
err = 'Wrong callback: {} [{}]'.format(e, self)
logger.error(err)
if raise_exceptions:
raise ValueError(err)
return None

return self._get_verified_obj_signature('task', raise_exceptions)

def get_verified_callback_signature(self, raise_exceptions=True):
return self._get_verified_obj_signature('callback', raise_exceptions)

def _get_verified_obj_signature(self, object_name, raise_exceptions):
assert object_name in ('task', 'callback'), ValueError('Unknown object_name')

obj_signarute = getattr(self, '{}_signature'.format(object_name), None)
obj_signarute_sign = getattr(self, '{}_signature_sign'.format(object_name), None)

if obj_signarute is None:
return None

if obj_signarute_sign is None:
err = 'Not found `{}_signature_sign` for `{}` (use django_celery_be' \
'at.utils.sign to sign). Task disabled.'.format(object_name, self)
self.enabled = False
self.save(update_fields=['enabled'])
logger.error(err)
if raise_exceptions:
raise ValueError(err)
return None

obj_signarute = bytes(obj_signarute)

if not verify_task_signature(obj_signarute, obj_signarute_sign):
err = 'Wrong sign for `{}`. Task disabled.'.format(self)
self.enabled = False
self.save(update_fields=['enabled'])
logger.error(err)
if raise_exceptions:
raise ValueError(err)
return None

return dill.loads(obj_signarute)

@property
def expires_(self):
return self.expires or self.expire_seconds
Expand Down
56 changes: 46 additions & 10 deletions django_celery_beat/schedulers.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
"""Beat Scheduler Implementation."""
from __future__ import absolute_import, unicode_literals

import datetime
import logging
import math

from multiprocessing.util import Finalize
import sys

from celery import current_app
from celery import schedules
from celery.beat import Scheduler, ScheduleEntry

# noinspection PyProtectedMember
from celery.beat import Scheduler, ScheduleEntry, SchedulingError, BeatLazyFunc
from celery.exceptions import reraise
# from celery.utils.encoding import safe_str, safe_repr
from celery.utils.log import get_logger
from celery.utils.time import maybe_make_aware
from kombu.utils.encoding import safe_str, safe_repr
from kombu.utils.json import dumps, loads

from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
# noinspection PyProtectedMember
from django.db import transaction, close_old_connections
from django.db.utils import DatabaseError, InterfaceError
from django.core.exceptions import ObjectDoesNotExist
from kombu.utils.encoding import safe_str, safe_repr
from kombu.utils.json import dumps, loads

from .clockedschedule import clocked
from .models import (
PeriodicTask, PeriodicTasks,
CrontabSchedule, IntervalSchedule,
SolarSchedule, ClockedSchedule
)
from .clockedschedule import clocked
from .utils import NEVER_CHECK_TIMEOUT

# This scheduler must wake up more frequently than the
Expand Down Expand Up @@ -56,6 +59,8 @@ def __init__(self, model, app=None):
self.app = app or current_app._get_current_object()
self.name = model.name
self.task = model.task
self.task_signature = model.get_verified_task_signature()

try:
self.schedule = model.schedule
except model.DoesNotExist:
Expand All @@ -74,7 +79,10 @@ def __init__(self, model, app=None):
)
self._disable(model)

self.options = {}
self.options = {
'link': model.get_verified_callback_signature()
}

for option in ['queue', 'exchange', 'routing_key', 'priority']:
value = getattr(model, option)
if value is None:
Expand Down Expand Up @@ -229,6 +237,7 @@ def __init__(self, *args, **kwargs):
"""Initialize the database scheduler."""
self._dirty = set()
Scheduler.__init__(self, *args, **kwargs)
# noinspection PyUnresolvedReferences
self._finalize = Finalize(self, self.sync, exitpriority=5)
self.max_interval = (
kwargs.get('max_interval')
Expand Down Expand Up @@ -368,3 +377,30 @@ def schedule(self):
repr(entry) for entry in self._schedule.values()),
)
return self._schedule

def apply_async(self, entry, producer=None, advance=True, **kwargs):
# Update time-stamps and run counts before we actually execute,
# so we have that done if an exception is raised (doesn't schedule
# forever.)
entry = self.reserve(entry) if advance else entry
task = entry.task_signature if entry.task_signature is not None else self.app.tasks.get(entry.task)

try:
entry_args = [v() if isinstance(v, BeatLazyFunc) else v for v in (entry.args or [])]
entry_kwargs = {k: v() if isinstance(v, BeatLazyFunc) else v for k, v in entry.kwargs.items()}
if task:
return task.apply_async(entry_args, entry_kwargs,
producer=producer,
**entry.options)
else:
return self.send_task(entry.task, entry_args, entry_kwargs,
producer=producer,
**entry.options)
except Exception as exc: # pylint: disable=broad-except
reraise(SchedulingError, SchedulingError(
"Couldn't apply scheduled task {0.name}: {exc}".format(
entry, exc=exc)), sys.exc_info()[2])
finally:
self._tasks_since_sync += 1
if self.should_sync():
self._do_sync()
Loading