diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index c6b3861..0e3cb4c 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -15,7 +15,7 @@ from redis import WatchError -from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until +from .utils import from_unix, to_unix, get_next_scheduled_time, get_next_rrule_scheduled_time, rationalize_until logger = logging.getLogger(__name__) @@ -298,6 +298,36 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, {job.id: to_unix(scheduled_time)}) return job + + def rrule(self, rrule_string, func, args=None, kwargs=None, repeat=None, + queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False, + depends_on=None, on_success=None, on_failure=None, at_front: bool = False): + """ + Schedule a recurring job via RRule + """ + scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone) + + job = self._create_job(func, args=args, kwargs=kwargs, commit=False, + result_ttl=result_ttl, ttl=ttl, id=id, queue_name=queue_name, + description=description, timeout=timeout, meta=meta, depends_on=depends_on, + on_success=on_success, on_failure=on_failure) + + job.meta['rrule_string'] = rrule_string + job.meta['use_local_timezone'] = use_local_timezone + + if repeat is not None: + job.meta['repeat'] = int(repeat) + + if at_front: + job.enqueue_at_front = True + + job.save() + + self.connection.zadd(self.scheduled_jobs_key, + {job.id: to_unix(scheduled_time)}) + return job + + def cancel(self, job): """ Pulls a job from the scheduler queue. This function accepts either a @@ -415,6 +445,7 @@ def enqueue_job(self, job): interval = job.meta.get('interval', None) repeat = job.meta.get('repeat', None) cron_string = job.meta.get('cron_string', None) + rrule_string = job.meta.get('rrule_string', None) use_local_timezone = job.meta.get('use_local_timezone', None) # If job is a repeated job, decrement counter @@ -425,21 +456,21 @@ def enqueue_job(self, job): queue.enqueue_job(job, at_front=bool(job.enqueue_at_front)) self.connection.zrem(self.scheduled_jobs_key, job.id) + # If this is a repeat job and counter has reached 0, don't repeat + if repeat is not None: + if job.meta['repeat'] == 0: + return if interval: - # If this is a repeat job and counter has reached 0, don't repeat - if repeat is not None: - if job.meta['repeat'] == 0: - return self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(datetime.utcnow()) + int(interval)}) elif cron_string: - # If this is a repeat job and counter has reached 0, don't repeat - if repeat is not None: - if job.meta['repeat'] == 0: - return next_scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(next_scheduled_time)}) + elif rrule_string: + next_scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone) + self.connection.zadd(self.scheduled_jobs_key, + {job.id: to_unix(next_scheduled_time)}) def enqueue_jobs(self): """ diff --git a/rq_scheduler/utils.py b/rq_scheduler/utils.py index c34ab02..6562d26 100644 --- a/rq_scheduler/utils.py +++ b/rq_scheduler/utils.py @@ -1,6 +1,8 @@ import calendar import crontab import dateutil.tz +import dateutil.rrule +import dateutil.tz from datetime import datetime, timedelta import logging @@ -30,6 +32,16 @@ def get_next_scheduled_time(cron_string, use_local_timezone=False): return next_time.astimezone(tz) +def get_next_rrule_scheduled_time(rrule_string, use_local_timezone=False): + """Calculate the next scheduled time by creating a rrule object + with a rrule string""" + now = datetime.now(tz=dateutil.tz.UTC) + rrule = dateutil.rrule.rrulestr(rrule_string) + next_time = rrule.after(now, inc=True) + tz = dateutil.tz.tzlocal() if use_local_timezone else dateutil.tz.UTC + return next_time.astimezone(tz) + + def setup_loghandlers(level='INFO'): logger = logging.getLogger('rq_scheduler.scheduler') if not logger.handlers: