diff --git a/lib/Resque.php b/lib/Resque.php index 53353107..ddc4a02a 100644 --- a/lib/Resque.php +++ b/lib/Resque.php @@ -210,9 +210,11 @@ public static function sizeWorking($queue) { * * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue */ - public static function enqueue($queue, $class, $args = null, $trackStatus = false) + public static function enqueue($queue, $class, $args = null, $trackStatus = false, $id = null) { - $id = Resque::generateJobId(); + if(!$id) { + $id = Resque::generateJobId(); + } $hookParams = array( 'class' => $class, 'args' => $args, diff --git a/lib/Resque/Job/Status.php b/lib/Resque/Job/Status.php index 03fb9122..87435151 100644 --- a/lib/Resque/Job/Status.php +++ b/lib/Resque/Job/Status.php @@ -12,6 +12,7 @@ class Resque_Job_Status const STATUS_RUNNING = 2; const STATUS_FAILED = 3; const STATUS_COMPLETE = 4; + const STATUS_DELAYED = 5; /** * @var string The ID of the job this status class refers back to. @@ -47,11 +48,12 @@ public function __construct($id) * all necessary keys in Redis to monitor the status of a job. * * @param string $id The ID of the job to monitor the status of. + * @param int $startStatus The start status of the job - defaults to STATUS_WAITING */ - public static function create($id) + public static function create($id, $startStatus = Resque_Job_Status::STATUS_WAITING) { $statusPacket = array( - 'status' => self::STATUS_WAITING, + 'status' => $startStatus, 'updated' => time(), self::statusToString(self::STATUS_WAITING) => time(), 'started' => time(), diff --git a/lib/ResqueScheduler.php b/lib/ResqueScheduler.php index 691f1002..b103bfb0 100644 --- a/lib/ResqueScheduler.php +++ b/lib/ResqueScheduler.php @@ -21,10 +21,11 @@ class ResqueScheduler * @param string $queue The name of the queue to place the job in. * @param string $class The name of the class that contains the code to execute the job. * @param array $args Any optional arguments that should be passed when the job is executed. + * @param boolean $trackStatus Set to true to be able to monitor the status of a job. */ - public static function enqueueIn($in, $queue, $class, array $args = array()) + public static function enqueueIn($in, $queue, $class, array $args = array(), $trackStatus = false) { - self::enqueueAt(time() + $in, $queue, $class, $args); + return self::enqueueAt(time() + $in, $queue, $class, $args, $trackStatus); } /** @@ -38,20 +39,29 @@ public static function enqueueIn($in, $queue, $class, array $args = array()) * @param string $queue The name of the queue to place the job in. * @param string $class The name of the class that contains the code to execute the job. * @param array $args Any optional arguments that should be passed when the job is executed. + * @param boolean $trackStatus Set to true to be able to monitor the status of a job. */ - public static function enqueueAt($at, $queue, $class, $args = array()) + public static function enqueueAt($at, $queue, $class, $args = array(), $trackStatus = false) { self::validateJob($class, $queue); - $job = self::jobToHash($queue, $class, $args); + $id = Resque::generateJobId(); + $job = self::jobToHash($queue, $class, $args, $id, $trackStatus); self::delayedPush($at, $job); + if($trackStatus) { + Resque_Job_Status::create($id, Resque_Job_Status::STATUS_DELAYED); + } + Resque_Event::trigger('afterSchedule', array( 'at' => $at, 'queue' => $queue, 'class' => $class, 'args' => $args, + 'id' => $id )); + + return $id; } /** @@ -91,6 +101,7 @@ public static function getDelayedTimestampSize($timestamp) return Resque::redis()->llen('delayed:' . $timestamp, $timestamp); } + // TODO: Re-implement removeDelayed after adding status tracking to job hash /** * Remove a delayed job from the queue * @@ -106,7 +117,7 @@ public static function getDelayedTimestampSize($timestamp) * @param $args * @return int number of jobs that were removed */ - public static function removeDelayed($queue, $class, $args) + /*public static function removeDelayed($queue, $class, $args) { $destroyed=0; $item=json_encode(self::jobToHash($queue, $class, $args)); @@ -119,7 +130,7 @@ public static function removeDelayed($queue, $class, $args) } return $destroyed; - } + }*/ /** * removed a delayed job queued for a specific timestamp @@ -134,7 +145,7 @@ public static function removeDelayed($queue, $class, $args) * @param $args * @return mixed */ - public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) + /*public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, $args) { $key = 'delayed:' . self::getTimestamp($timestamp); $item = json_encode(self::jobToHash($queue, $class, $args)); @@ -143,7 +154,7 @@ public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, self::cleanupTimestamp($key, $timestamp); return $count; - } + }*/ /** * Generate hash of all job properties to be saved in the scheduled queue. @@ -153,12 +164,14 @@ public static function removeDelayedJobFromTimestamp($timestamp, $queue, $class, * @param array $args Array of job arguments. */ - private static function jobToHash($queue, $class, $args) + private static function jobToHash($queue, $class, $args, $id, $trackStatus) { return array( 'class' => $class, 'args' => array($args), 'queue' => $queue, + 'id' => $id, + 'trackStatus' => $trackStatus ); } diff --git a/lib/ResqueScheduler/Worker.php b/lib/ResqueScheduler/Worker.php index 61926deb..f227d5b8 100644 --- a/lib/ResqueScheduler/Worker.php +++ b/lib/ResqueScheduler/Worker.php @@ -81,7 +81,7 @@ public function enqueueDelayedItemsForTimestamp($timestamp) 'args' => $item['args'], )); - $payload = array_merge(array($item['queue'], $item['class']), $item['args']); + $payload = array_merge(array($item['queue'], $item['class']), $item['args'], array($item['trackStatus'], $item['id'])); call_user_func_array('Resque::enqueue', $payload); } }