Skip to content

Commit

Permalink
Add RRule support
Browse files Browse the repository at this point in the history
  • Loading branch information
PierrickBrun committed Nov 25, 2024
1 parent f7d5787 commit 7345391
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
49 changes: 40 additions & 9 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from redis import WatchError

from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until
from .utils import from_unix, to_unix, get_next_scheduled_time, get_next_rrule_scheduled_time, rationalize_until

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -298,6 +298,36 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
{job.id: to_unix(scheduled_time)})
return job


def rrule(self, rrule_string, func, args=None, kwargs=None, repeat=None,
queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False,
depends_on=None, on_success=None, on_failure=None, at_front: bool = False):
"""
Schedule a recurring job via RRule
"""
scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone)

job = self._create_job(func, args=args, kwargs=kwargs, commit=False,
result_ttl=result_ttl, ttl=ttl, id=id, queue_name=queue_name,
description=description, timeout=timeout, meta=meta, depends_on=depends_on,
on_success=on_success, on_failure=on_failure)

job.meta['rrule_string'] = rrule_string
job.meta['use_local_timezone'] = use_local_timezone

if repeat is not None:
job.meta['repeat'] = int(repeat)

if at_front:
job.enqueue_at_front = True

job.save()

self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(scheduled_time)})
return job


def cancel(self, job):
"""
Pulls a job from the scheduler queue. This function accepts either a
Expand Down Expand Up @@ -415,6 +445,7 @@ def enqueue_job(self, job):
interval = job.meta.get('interval', None)
repeat = job.meta.get('repeat', None)
cron_string = job.meta.get('cron_string', None)
rrule_string = job.meta.get('rrule_string', None)
use_local_timezone = job.meta.get('use_local_timezone', None)

# If job is a repeated job, decrement counter
Expand All @@ -425,21 +456,21 @@ def enqueue_job(self, job):
queue.enqueue_job(job, at_front=bool(job.enqueue_at_front))
self.connection.zrem(self.scheduled_jobs_key, job.id)

# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
if interval:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(datetime.utcnow()) + int(interval)})
elif cron_string:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
next_scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone)
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(next_scheduled_time)})
elif rrule_string:
next_scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone)
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(next_scheduled_time)})

def enqueue_jobs(self):
"""
Expand Down
12 changes: 12 additions & 0 deletions rq_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import calendar
import crontab
import dateutil.tz
import dateutil.rrule
import dateutil.tz

from datetime import datetime, timedelta
import logging
Expand Down Expand Up @@ -30,6 +32,16 @@ def get_next_scheduled_time(cron_string, use_local_timezone=False):
return next_time.astimezone(tz)


def get_next_rrule_scheduled_time(rrule_string, use_local_timezone=False):
"""Calculate the next scheduled time by creating a rrule object
with a rrule string"""
now = datetime.now(tz=dateutil.tz.UTC)
rrule = dateutil.rrule.rrulestr(rrule_string)
next_time = rrule.after(now, inc=True)
tz = dateutil.tz.tzlocal() if use_local_timezone else dateutil.tz.UTC
return next_time.astimezone(tz)


def setup_loghandlers(level='INFO'):
logger = logging.getLogger('rq_scheduler.scheduler')
if not logger.handlers:
Expand Down

0 comments on commit 7345391

Please sign in to comment.